http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
new file mode 100644
index 0000000..d29f1e9
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "namenode_info.h"
+
+#include "common/util.h"
+#include "common/logging.h"
+
+#include <sstream>
+#include <utility>
+#include <future>
+#include <memory>
+
+namespace hdfs {
+
+ResolvedNamenodeInfo& ResolvedNamenodeInfo::operator=(const NamenodeInfo 
&info) {
+  nameservice = info.nameservice;
+  name = info.name;
+  uri = info.uri;
+  return *this;
+}
+
+
+
+std::string ResolvedNamenodeInfo::str() const {
+  std::stringstream ss;
+  ss << "ResolvedNamenodeInfo {nameservice: " << nameservice << ", name: " << 
name << ", uri: " << uri.str();
+  ss << ", host: " << uri.get_host();
+
+  if(uri.has_port())
+    ss << ", port: " << uri.get_port();
+  else
+    ss << ", invalid port (uninitialized)";
+
+  ss << ", scheme: " << uri.get_scheme();
+
+  ss << " [";
+  for(unsigned int i=0;i<endpoints.size();i++)
+    ss << endpoints[i] << " ";
+  ss << "] }";
+
+  return ss.str();
+}
+
+
+bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info) 
{
+  // this isn't very memory friendly, but if it needs to be called often there 
are bigger issues at hand
+  info.endpoints.clear();
+  std::vector<ResolvedNamenodeInfo> resolved = BulkResolve(ioservice, {info});
+  if(resolved.size() != 1)
+    return false;
+
+  info.endpoints = resolved[0].endpoints;
+  if(info.endpoints.size() == 0)
+    return false;
+  return true;
+}
+
+typedef std::vector<asio::ip::tcp::endpoint> endpoint_vector;
+
+// RAII wrapper
+class ScopedResolver {
+ private:
+  ::asio::io_service *io_service_;
+  std::string host_;
+  std::string port_;
+  ::asio::ip::tcp::resolver::query query_;
+  ::asio::ip::tcp::resolver resolver_;
+  endpoint_vector endpoints_;
+
+  // Caller blocks on access if resolution isn't finished
+  std::shared_ptr<std::promise<Status>> result_status_;
+ public:
+  ScopedResolver(::asio::io_service *service, const std::string &host, const 
std::string &port) :
+        io_service_(service), host_(host), port_(port), query_(host, port), 
resolver_(*io_service_)
+  {
+    if(!io_service_)
+      LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << " passed 
nullptr to io_service");
+  }
+
+  ~ScopedResolver() {
+    resolver_.cancel();
+  }
+
+  bool BeginAsyncResolve() {
+    // result_status_ would only exist if this was previously called.  Invalid 
state.
+    if(result_status_) {
+      LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << 
"::BeginAsyncResolve invalid call: may only be called once per instance");
+      return false;
+    } else if(!io_service_) {
+      LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << 
"::BeginAsyncResolve invalid call: null io_service");
+      return false;
+    }
+
+    // Now set up the promise, set it in async_resolve's callback
+    result_status_ = std::make_shared<std::promise<Status>>();
+    std::shared_ptr<std::promise<Status>> shared_result = result_status_;
+
+    // Callback to pull a copy of endpoints out of resolver and set promise
+    auto callback = [this, shared_result](const asio::error_code &ec, 
::asio::ip::tcp::resolver::iterator out) {
+      if(!ec) {
+        std::copy(out, ::asio::ip::tcp::resolver::iterator(), 
std::back_inserter(endpoints_));
+      }
+      shared_result->set_value( ToStatus(ec) );
+    };
+    resolver_.async_resolve(query_, callback);
+    return true;
+  }
+
+  Status Join() {
+    if(!result_status_) {
+      std::ostringstream errmsg;
+      errmsg <<  "ScopedResolver@" << this << "Join invalid call: promise 
never set";
+      return Status::InvalidArgument(errmsg.str().c_str());
+    }
+
+    std::future<Status> future_result = result_status_->get_future();
+    Status res = future_result.get();
+    return res;
+  }
+
+  endpoint_vector GetEndpoints() {
+    // Explicitly return by value to decouple lifecycles.
+    return endpoints_;
+  }
+};
+
+std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, 
const std::vector<NamenodeInfo> &nodes) {
+  std::vector< std::unique_ptr<ScopedResolver> > resolvers;
+  resolvers.reserve(nodes.size());
+
+  std::vector<ResolvedNamenodeInfo> resolved_info;
+  resolved_info.reserve(nodes.size());
+
+  for(unsigned int i=0; i<nodes.size(); i++) {
+    std::string host = nodes[i].get_host();
+    std::string port = nodes[i].get_port();
+
+    resolvers.emplace_back(new ScopedResolver(ioservice, host, port));
+    resolvers[i]->BeginAsyncResolve();
+  }
+
+  // Join all async operations
+  for(unsigned int i=0; i < resolvers.size(); i++) {
+    Status asyncReturnStatus = resolvers[i]->Join();
+
+    ResolvedNamenodeInfo info;
+    info = nodes[i];
+
+    if(asyncReturnStatus.ok()) {
+      // Copy out endpoints if things went well
+      info.endpoints = resolvers[i]->GetEndpoints();
+    } else {
+      LOG_ERROR(kAsyncRuntime, << "Unabled to resolve endpoints for host: " << 
nodes[i].get_host()
+                                                               << " port: " << 
nodes[i].get_port());
+    }
+
+    resolved_info.push_back(info);
+  }
+  return resolved_info;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h
new file mode 100644
index 0000000..fdee8d7
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMON_HDFS_NAMENODE_INFO_H_
+#define COMMON_HDFS_NAMENODE_INFO_H_
+
+#include <asio.hpp>
+#include <hdfspp/options.h>
+
+#include <string>
+#include <vector>
+
+namespace hdfs {
+
+// Internal representation of namenode info that keeps track
+// of its endpoints.
+struct ResolvedNamenodeInfo : public NamenodeInfo {
+  ResolvedNamenodeInfo& operator=(const NamenodeInfo &info);
+  std::string str() const;
+
+  std::vector<::asio::ip::tcp::endpoint> endpoints;
+};
+
+// Clear endpoints if set and resolve all of them in parallel.
+// Only successful lookups will be placed in the result set.
+std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, 
const std::vector<NamenodeInfo> &nodes);
+
+// Clear endpoints, if any, and resolve them again
+// Return true if endpoints were resolved
+bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info);
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/new_delete.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/new_delete.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/new_delete.h
new file mode 100644
index 0000000..48f97ca
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/new_delete.h
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMON_HDFS_NEW_DELETE_H_
+#define COMMON_HDFS_NEW_DELETE_H_
+
+#include <cstring>
+
+struct mem_struct {
+  size_t mem_size;
+};
+
+#ifndef NDEBUG
+#define MEMCHECKED_CLASS(clazz) \
+static void* operator new(size_t size) { \
+  void* p = ::malloc(size); \
+  return p; \
+} \
+static void* operator new[](size_t size) { \
+  mem_struct* p = (mem_struct*)::malloc(sizeof(mem_struct) + size); \
+  p->mem_size = size; \
+  return (void*)++p; \
+} \
+static void operator delete(void* p) { \
+  ::memset(p, 0, sizeof(clazz)); \
+  ::free(p); \
+} \
+static void operator delete[](void* p) { \
+  mem_struct* header = (mem_struct*)p; \
+  size_t size = (--header)->mem_size; \
+  ::memset(p, 0, size); \
+  ::free(header); \
+}
+#else
+#define MEMCHECKED_CLASS(clazz)
+#endif
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/optional_wrapper.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/optional_wrapper.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/optional_wrapper.h
new file mode 100644
index 0000000..2d15dc3
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/optional_wrapper.h
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMON_OPTIONAL_WRAPPER_H_
+#define COMMON_OPTIONAL_WRAPPER_H_
+
+#ifdef __clang__
+  #pragma clang diagnostic push
+  #if __has_warning("-Wweak-vtables")
+    #pragma clang diagnostic ignored "-Wweak-vtables"
+  #endif
+  #if __has_warning("-Wreserved-id-macro")
+    #pragma clang diagnostic ignored "-Wreserved-id-macro"
+  #endif
+  #if __has_warning("-Wextra-semi")
+    #pragma clang diagnostic ignored "-Wextra-semi"
+  #endif
+  #define TR2_OPTIONAL_DISABLE_EMULATION_OF_TYPE_TRAITS  //For Clang < 3_4_2
+#endif
+
+#include <optional.hpp>
+
+#ifdef __clang__
+  #undef TR2_OPTIONAL_DISABLE_EMULATION_OF_TYPE_TRAITS  //For Clang < 3_4_2
+  #pragma clang diagnostic pop
+#endif
+
+#endif //COMMON_OPTIONAL_WRAPPER_H_

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
new file mode 100644
index 0000000..48c83f1
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hdfspp/options.h"
+
+namespace hdfs {
+
+// The linker needs a place to put all of those constants
+const int Options::kDefaultRpcTimeout;
+const int Options::kNoRetry;
+const int Options::kDefaultMaxRpcRetries;
+const int Options::kDefaultRpcRetryDelayMs;
+const unsigned int Options::kDefaultHostExclusionDuration;
+const unsigned int Options::kDefaultFailoverMaxRetries;
+const unsigned int Options::kDefaultFailoverConnectionMaxRetries;
+const long Options::kDefaultBlockSize;
+
+Options::Options() : rpc_timeout(kDefaultRpcTimeout),
+                     rpc_connect_timeout(kDefaultRpcConnectTimeout),
+                     max_rpc_retries(kDefaultMaxRpcRetries),
+                     rpc_retry_delay_ms(kDefaultRpcRetryDelayMs),
+                     host_exclusion_duration(kDefaultHostExclusionDuration),
+                     defaultFS(),
+                     failover_max_retries(kDefaultFailoverMaxRetries),
+                     
failover_connection_max_retries(kDefaultFailoverConnectionMaxRetries),
+                     authentication(kDefaultAuthentication),
+                     block_size(kDefaultBlockSize),
+                     io_threads_(kDefaultIoThreads)
+{
+
+}
+
+std::string NamenodeInfo::get_host() const {
+  return uri.get_host();
+}
+
+std::string NamenodeInfo::get_port() const {
+  if(uri.has_port()) {
+    return std::to_string(uri.get_port());
+  }
+  return "-1";
+}
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc
new file mode 100644
index 0000000..dca49fb
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/retry_policy.h"
+#include "common/logging.h"
+
+#include <sstream>
+
+namespace hdfs {
+
+RetryAction FixedDelayRetryPolicy::ShouldRetry(
+    const Status &s, uint64_t retries, uint64_t failovers,
+    bool isIdempotentOrAtMostOnce) const {
+  LOG_TRACE(kRPC, << "FixedDelayRetryPolicy::ShouldRetry(retries=" << retries 
<< ", failovers=" << failovers << ")");
+  (void)isIdempotentOrAtMostOnce;
+  if (retries + failovers >= max_retries_) {
+    return RetryAction::fail(
+        "Failovers and retries(" + std::to_string(retries + failovers) +
+        ") exceeded maximum retries (" + std::to_string(max_retries_) + "), 
Status: " +
+        s.ToString());
+  } else {
+    return RetryAction::retry(delay_);
+  }
+}
+
+
+RetryAction NoRetryPolicy::ShouldRetry(
+    const Status &s, uint64_t retries, uint64_t failovers,
+    bool isIdempotentOrAtMostOnce) const {
+  LOG_TRACE(kRPC, << "NoRetryPolicy::ShouldRetry(retries=" << retries << ", 
failovers=" << failovers << ")");
+  (void)retries;
+  (void)failovers;
+  (void)isIdempotentOrAtMostOnce;
+  return RetryAction::fail("No retry, Status: " + s.ToString());
+}
+
+
+RetryAction FixedDelayWithFailover::ShouldRetry(const Status &s, uint64_t 
retries,
+    uint64_t failovers,
+    bool isIdempotentOrAtMostOnce) const {
+  (void)isIdempotentOrAtMostOnce;
+  (void)max_failover_conn_retries_;
+  LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry(retries=" << retries 
<< ", failovers=" << failovers << ")");
+
+  if(failovers < max_failover_retries_ && (s.code() == 
::asio::error::timed_out || s.get_server_exception_type() == 
Status::kStandbyException) )
+  {
+    // Try connecting to another NN in case this one keeps timing out
+    // Can add the backoff wait specified by 
dfs.client.failover.sleep.base.millis here
+    if(failovers == 0) {
+      // No delay on first failover if it looks like the NN was bad.
+      return RetryAction::failover(0);
+    } else {
+      return RetryAction::failover(delay_);
+    }
+  }
+
+  if(retries < max_retries_ && failovers < max_failover_retries_) {
+    LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries < 
max_retries_ && failovers < max_failover_retries_");
+    return RetryAction::retry(delay_);
+  } else if (retries >= max_retries_ && failovers < max_failover_retries_) {
+    LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries >= 
max_retries_ && failovers < max_failover_retries_");
+    return RetryAction::failover(delay_);
+  } else if (retries <= max_retries_ && failovers == max_failover_retries_) {
+    LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries <= 
max_retries_ && failovers == max_failover_retries_");
+    // 1 last retry on new connection
+    return RetryAction::retry(delay_);
+  }
+
+  return RetryAction::fail("Retry and failover didn't work, Status: " + 
s.ToString());
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h
new file mode 100644
index 0000000..0b5bd80
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_COMMON_RETRY_POLICY_H_
+#define LIB_COMMON_RETRY_POLICY_H_
+
+#include "common/util.h"
+
+#include <string>
+#include <stdint.h>
+
+namespace hdfs {
+
+class RetryAction {
+ public:
+  enum RetryDecision { FAIL, RETRY, FAILOVER_AND_RETRY };
+
+  RetryDecision action;
+  uint64_t delayMillis;
+  std::string reason;
+
+  RetryAction(RetryDecision in_action, uint64_t in_delayMillis,
+              const std::string &in_reason)
+      : action(in_action), delayMillis(in_delayMillis), reason(in_reason) {}
+
+  static RetryAction fail(const std::string &reason) {
+    return RetryAction(FAIL, 0, reason);
+  }
+  static RetryAction retry(uint64_t delay) {
+    return RetryAction(RETRY, delay, "");
+  }
+  static RetryAction failover(uint64_t delay) {
+    return RetryAction(FAILOVER_AND_RETRY, delay, "");
+  }
+
+  std::string decision_str() const {
+    switch(action) {
+      case FAIL: return "FAIL";
+      case RETRY: return "RETRY";
+      case FAILOVER_AND_RETRY: return "FAILOVER_AND_RETRY";
+      default: return "UNDEFINED ACTION";
+    }
+  };
+};
+
+class RetryPolicy {
+ protected:
+  uint64_t delay_;
+  uint64_t max_retries_;
+  RetryPolicy(uint64_t delay, uint64_t max_retries) :
+              delay_(delay), max_retries_(max_retries) {}
+
+ public:
+  RetryPolicy() {};
+
+  virtual ~RetryPolicy() {}
+  /*
+   * If there was an error in communications, responds with the configured
+   * action to take.
+   */
+  virtual RetryAction ShouldRetry(const Status &s, uint64_t retries,
+                                            uint64_t failovers,
+                                            bool isIdempotentOrAtMostOnce) 
const = 0;
+
+  virtual std::string str() const { return "Base RetryPolicy"; }
+};
+
+
+/*
+ * Overview of how the failover retry policy works:
+ *
+ * 1) Acts the same as FixedDelayRetryPolicy in terms of connection retries 
against a single NN
+ *    with two differences:
+ *      a) If we have retried more than the maximum number of retries we will 
failover to the
+ *         other node and reset the retry counter rather than error out.  It 
will begin the same
+ *         routine on the other node.
+ *      b) If an attempted connection times out and max_failover_conn_retries_ 
is less than the
+ *         normal number of retries it will failover sooner.  The connection 
timeout retry limit
+ *         defaults to zero; the idea being that if a node is unresponsive 
it's better to just
+ *         try the secondary rather than incur the timeout cost multiple times.
+ *
+ * 2) Keeps track of the failover count in the same way that the retry count 
is tracked.  If failover
+ *    is triggered more than a set number (dfs.client.failover.max.attempts) 
of times then the operation
+ *    will error out in the same way that a non-HA operation would error if it 
ran out of retries.
+ *
+ * 3) Failover between namenodes isn't instantaneous so the RPC retry delay is 
reused to add a small
+ *    delay between failover attempts.  This helps prevent the client from 
quickly using up all of
+ *    its failover attempts while thrashing between namenodes that are both 
temporarily marked standby.
+ *    Note: The java client implements exponential backoff here with a base 
other than the rpc delay,
+ *    and this will do the same here in the future. This doesn't do any sort 
of exponential backoff
+ *    and the name can be changed to ExponentialDelayWithFailover when backoff 
is implemented.
+ */
+class FixedDelayWithFailover : public RetryPolicy {
+ public:
+  FixedDelayWithFailover(uint64_t delay, uint64_t max_retries,
+                         uint64_t max_failover_retries,
+                         uint64_t max_failover_conn_retries)
+      : RetryPolicy(delay, max_retries), 
max_failover_retries_(max_failover_retries),
+        max_failover_conn_retries_(max_failover_conn_retries) {}
+
+  RetryAction ShouldRetry(const Status &s, uint64_t retries,
+                          uint64_t failovers,
+                          bool isIdempotentOrAtMostOnce) const override;
+
+  std::string str() const override { return "FixedDelayWithFailover"; }
+
+ private:
+  // Attempts to fail over
+  uint64_t max_failover_retries_;
+  // Attempts to fail over if connection times out rather than
+  // tring to connect and wait for the timeout delay failover_retries_
+  // times.
+  uint64_t max_failover_conn_retries_;
+};
+
+
+/*
+ * Returns a fixed delay up to a certain number of retries
+ */
+class FixedDelayRetryPolicy : public RetryPolicy {
+ public:
+  FixedDelayRetryPolicy(uint64_t delay, uint64_t max_retries)
+      : RetryPolicy(delay, max_retries) {}
+
+  RetryAction ShouldRetry(const Status &s, uint64_t retries,
+                          uint64_t failovers,
+                          bool isIdempotentOrAtMostOnce) const override;
+
+  std::string str() const override { return "FixedDelayRetryPolicy"; }
+};
+
+/*
+ * Never retries
+ */
+class NoRetryPolicy : public RetryPolicy {
+ public:
+  NoRetryPolicy() {};
+  RetryAction ShouldRetry(const Status &s, uint64_t retries,
+                          uint64_t failovers,
+                          bool isIdempotentOrAtMostOnce) const override;
+
+  std::string str() const override { return "NoRetryPolicy"; }
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h
new file mode 100644
index 0000000..78b2a55
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_COMMON_SASL_AUTHENTICATOR_H_
+#define LIB_COMMON_SASL_AUTHENTICATOR_H_
+
+#include "hdfspp/status.h"
+
+namespace hdfs {
+
+class DigestMD5AuthenticatorTest_TestResponse_Test;
+
+/**
+ * A specialized implementation of RFC 2831 for the HDFS
+ * DataTransferProtocol.
+ *
+ * The current lacks the following features:
+ *   * Encoding the username, realm, and password in ISO-8859-1 when
+ * it is required by the RFC. They are always encoded in UTF-8.
+ *   * Checking whether the challenges from the server are
+ * well-formed.
+ *   * Specifying authzid, digest-uri and maximum buffer size.
+ *   * Supporting QOP other than the auth level.
+ **/
+class DigestMD5Authenticator {
+public:
+  Status EvaluateResponse(const std::string &payload, std::string *result);
+  DigestMD5Authenticator(const std::string &username,
+                         const std::string &password, bool mock_nonce = false);
+
+private:
+  Status GenerateFirstResponse(std::string *result);
+  Status GenerateResponseValue(std::string *response_value);
+  Status ParseFirstChallenge(const std::string &payload);
+
+  static size_t NextToken(const std::string &payload, size_t off,
+                          std::string *tok);
+  void GenerateCNonce();
+  std::string username_;
+  std::string password_;
+  std::string nonce_;
+  std::string cnonce_;
+  std::string realm_;
+  std::string qop_;
+  unsigned nonce_count_;
+
+  const bool TEST_mock_cnonce_;
+  friend class DigestMD5AuthenticatorTest_TestResponse_Test;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc
new file mode 100644
index 0000000..3ca8578
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sasl_authenticator.h"
+
+#include "common/util.h"
+
+#include <openssl/rand.h>
+#include <openssl/md5.h>
+
+#include <iomanip>
+#include <map>
+#include <sstream>
+
+namespace hdfs {
+
+static std::string QuoteString(const std::string &src);
+static std::string GetMD5Digest(const std::string &src);
+static std::string BinaryToHex(const std::string &src);
+
+static const char kDigestUri[] = "hdfs/0";
+static const size_t kMaxBufferSize = 65536;
+
+DigestMD5Authenticator::DigestMD5Authenticator(const std::string &username,
+                                               const std::string &password,
+                                               bool mock_nonce)
+    : username_(username), password_(password), nonce_count_(0),
+      TEST_mock_cnonce_(mock_nonce) {}
+
+Status DigestMD5Authenticator::EvaluateResponse(const std::string &payload,
+                                                std::string *result) {
+  Status status = ParseFirstChallenge(payload);
+  if (status.ok()) {
+    status = GenerateFirstResponse(result);
+  }
+  return status;
+}
+
+size_t DigestMD5Authenticator::NextToken(const std::string &payload, size_t 
off,
+                                         std::string *tok) {
+  tok->clear();
+  if (off >= payload.size()) {
+    return std::string::npos;
+  }
+
+  char c = payload[off];
+  if (c == '=' || c == ',') {
+    *tok = c;
+    return off + 1;
+  }
+
+  int quote_count = 0;
+  for (; off < payload.size(); ++off) {
+    char c = payload[off];
+    if (c == '"') {
+      ++quote_count;
+      if (quote_count == 2) {
+        return off + 1;
+      }
+      continue;
+    }
+
+    if (c == '=') {
+      if (quote_count) {
+        tok->append(&c, 1);
+      } else {
+        break;
+      }
+    } else if (('0' <= c && c <= '9') || ('a' <= c && c <= 'z') ||
+               ('A' <= c && c <= 'Z') || c == '+' || c == '/' || c == '-' ||
+               c == '_' || c == '@') {
+      tok->append(&c, 1);
+    } else {
+      break;
+    }
+  }
+  return off;
+}
+
+void DigestMD5Authenticator::GenerateCNonce() {
+  if (!TEST_mock_cnonce_) {
+    char buf[8] = {0,};
+    RAND_pseudo_bytes(reinterpret_cast<unsigned char *>(buf), sizeof(buf));
+    cnonce_ = Base64Encode(std::string(buf, sizeof(buf)));
+  }
+}
+
+Status DigestMD5Authenticator::ParseFirstChallenge(const std::string &payload) 
{
+  std::map<std::string, std::string> props;
+  std::string token;
+  enum {
+    kStateLVal,
+    kStateEqual,
+    kStateRVal,
+    kStateCommaOrEnd,
+  };
+
+  int state = kStateLVal;
+
+  std::string lval, rval;
+  size_t off = 0;
+  while (true) {
+    off = NextToken(payload, off, &token);
+    if (off == std::string::npos) {
+      break;
+    }
+
+    switch (state) {
+    case kStateLVal:
+      lval = token;
+      state = kStateEqual;
+      break;
+    case kStateEqual:
+      state = kStateRVal;
+      break;
+    case kStateRVal:
+      rval = token;
+      props[lval] = rval;
+      state = kStateCommaOrEnd;
+      break;
+    case kStateCommaOrEnd:
+      state = kStateLVal;
+      break;
+    }
+  }
+
+  if (props["algorithm"] != "md5-sess" || props["charset"] != "utf-8" ||
+      props.find("nonce") == props.end()) {
+    return Status::Error("Invalid challenge");
+  }
+  realm_ = props["realm"];
+  nonce_ = props["nonce"];
+  qop_ = props["qop"];
+  return Status::OK();
+}
+
+Status DigestMD5Authenticator::GenerateFirstResponse(std::string *result) {
+  // TODO: Support auth-int and auth-conf
+  // Handle cipher
+  if (qop_ != "auth") {
+    return Status::Unimplemented();
+  }
+
+  std::stringstream ss;
+  GenerateCNonce();
+  ss << "charset=utf-8,username=\"" << QuoteString(username_) << "\""
+     << ",authzid=\"" << QuoteString(username_) << "\""
+     << ",nonce=\"" << QuoteString(nonce_) << "\""
+     << ",digest-uri=\"" << kDigestUri << "\""
+     << ",maxbuf=" << kMaxBufferSize << ",cnonce=\"" << cnonce_ << "\"";
+
+  if (realm_.size()) {
+    ss << ",realm=\"" << QuoteString(realm_) << "\"";
+  }
+
+  ss << ",nc=" << std::hex << std::setw(8) << std::setfill('0')
+     << ++nonce_count_;
+  std::string response_value;
+  GenerateResponseValue(&response_value);
+  ss << ",response=" << response_value;
+  *result = ss.str();
+  return result->size() > 4096 ? Status::Error("Response too big")
+                               : Status::OK();
+}
+
+/**
+ * Generate the response value specified in S 2.1.2.1 in RFC2831.
+ **/
+Status
+DigestMD5Authenticator::GenerateResponseValue(std::string *response_value) {
+  std::stringstream begin_a1, a1_ss;
+  std::string a1, a2;
+
+  if (qop_ == "auth") {
+    a2 = std::string("AUTHENTICATE:") + kDigestUri;
+  } else {
+    a2 = std::string("AUTHENTICATE:") + kDigestUri +
+         ":00000000000000000000000000000000";
+  }
+
+  begin_a1 << username_ << ":" << realm_ << ":" << password_;
+  a1_ss << GetMD5Digest(begin_a1.str()) << ":" << nonce_ << ":" << cnonce_
+        << ":" << username_;
+
+  std::stringstream combine_ss;
+  combine_ss << BinaryToHex(GetMD5Digest(a1_ss.str())) << ":" << nonce_ << ":"
+             << std::hex << std::setw(8) << std::setfill('0') << nonce_count_
+             << ":" << cnonce_ << ":" << qop_ << ":"
+             << BinaryToHex(GetMD5Digest(a2));
+  *response_value = BinaryToHex(GetMD5Digest(combine_ss.str()));
+  return Status::OK();
+}
+
+static std::string QuoteString(const std::string &src) {
+  std::string dst;
+  dst.resize(2 * src.size());
+  size_t j = 0;
+  for (size_t i = 0; i < src.size(); ++i) {
+    if (src[i] == '"') {
+      dst[j++] = '\\';
+    }
+    dst[j++] = src[i];
+  }
+  dst.resize(j);
+  return dst;
+}
+
+static std::string GetMD5Digest(const std::string &src) {
+  MD5_CTX ctx;
+  unsigned long long res[2];
+  MD5_Init(&ctx);
+  MD5_Update(&ctx, src.c_str(), src.size());
+  MD5_Final(reinterpret_cast<unsigned char *>(res), &ctx);
+  return std::string(reinterpret_cast<char *>(res), sizeof(res));
+}
+
+static std::string BinaryToHex(const std::string &src) {
+  std::stringstream ss;
+  ss << std::hex << std::setfill('0');
+  for (size_t i = 0; i < src.size(); ++i) {
+    unsigned c = (unsigned)(static_cast<unsigned char>(src[i]));
+    ss << std::setw(2) << c;
+  }
+  return ss.str();
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/statinfo.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/statinfo.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/statinfo.cc
new file mode 100644
index 0000000..2fb744f
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/statinfo.cc
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hdfspp/statinfo.h>
+#include <sys/stat.h>
+#include <sstream>
+#include <iomanip>
+
+namespace hdfs {
+
+StatInfo::StatInfo()
+  : file_type(0),
+    length(0),
+    permissions(0),
+    modification_time(0),
+    access_time(0),
+    block_replication(0),
+    blocksize(0),
+    fileid(0),
+    children_num(0) {
+}
+
+std::string StatInfo::str() const {
+  char perms[11];
+  perms[0] = file_type == StatInfo::IS_DIR ? 'd' : '-';
+  perms[1] = permissions & S_IRUSR? 'r' : '-';
+  perms[2] = permissions & S_IWUSR? 'w': '-';
+  perms[3] = permissions & S_IXUSR? 'x': '-';
+  perms[4] = permissions & S_IRGRP? 'r' : '-';
+  perms[5] = permissions & S_IWGRP? 'w': '-';
+  perms[6] = permissions & S_IXGRP? 'x': '-';
+  perms[7] = permissions & S_IROTH? 'r' : '-';
+  perms[8] = permissions & S_IWOTH? 'w': '-';
+  perms[9] = permissions & S_IXOTH? 'x': '-';
+  perms[10] = 0;
+
+  //Convert to seconds from milliseconds
+  const int time_field_length = 17;
+  time_t rawtime = modification_time/1000;
+  struct tm * timeinfo;
+  char buffer[time_field_length];
+  timeinfo = localtime(&rawtime);
+
+  strftime(buffer,time_field_length,"%Y-%m-%d %H:%M",timeinfo);
+  buffer[time_field_length-1] = 0;  //null terminator
+  std::string time(buffer);
+
+  std::stringstream ss;
+  ss  << std::left << std::setw(12) << perms
+      << std::left << std::setw(3) << (!block_replication ? "-" : 
std::to_string(block_replication))
+      << std::left << std::setw(15) << owner
+      << std::left << std::setw(15) << group
+      << std::right << std::setw(5) << length
+      << std::right << std::setw(time_field_length + 2) << 
time//modification_time
+      << "  " << full_path;
+  return ss.str();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
new file mode 100644
index 0000000..4c5c7be
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hdfspp/status.h"
+
+#include <cassert>
+#include <sstream>
+#include <cstring>
+#include <map>
+#include <set>
+
+namespace hdfs {
+
+//  Server side exceptions that we capture from the RpcResponseHeaderProto
+const char * kStatusAccessControlException     = 
"org.apache.hadoop.security.AccessControlException";
+const char * kPathIsNotDirectoryException      = 
"org.apache.hadoop.fs.PathIsNotDirectoryException";
+const char * kSnapshotException                = 
"org.apache.hadoop.hdfs.protocol.SnapshotException";
+const char * kStatusStandbyException           = 
"org.apache.hadoop.ipc.StandbyException";
+const char * kStatusSaslException              = 
"javax.security.sasl.SaslException";
+const char * kPathNotFoundException            = 
"org.apache.hadoop.fs.InvalidPathException";
+const char * kPathNotFoundException2           = 
"java.io.FileNotFoundException";
+const char * kFileAlreadyExistsException       = 
"org.apache.hadoop.fs.FileAlreadyExistsException";
+const char * kPathIsNotEmptyDirectoryException = 
"org.apache.hadoop.fs.PathIsNotEmptyDirectoryException";
+
+
+const static std::map<std::string, int> kKnownServerExceptionClasses = {
+                                            {kStatusAccessControlException, 
Status::kAccessControlException},
+                                            {kPathIsNotDirectoryException, 
Status::kNotADirectory},
+                                            {kSnapshotException, 
Status::kSnapshotProtocolException},
+                                            {kStatusStandbyException, 
Status::kStandbyException},
+                                            {kStatusSaslException, 
Status::kAuthenticationFailed},
+                                            {kPathNotFoundException, 
Status::kPathNotFound},
+                                            {kPathNotFoundException2, 
Status::kPathNotFound},
+                                            {kFileAlreadyExistsException, 
Status::kFileAlreadyExists},
+                                            
{kPathIsNotEmptyDirectoryException, Status::kPathIsNotEmptyDirectory}
+                                        };
+
+// Errors that retry cannot fix. TODO: complete the list.
+const static std::set<int> noRetryExceptions = {
+  Status::kPermissionDenied,
+  Status::kAuthenticationFailed,
+  Status::kAccessControlException
+};
+
+Status::Status(int code, const char *msg1)
+               : code_(code) {
+  if(msg1) {
+    msg_ = msg1;
+  }
+}
+
+Status::Status(int code, const char *exception_class_name, const char 
*exception_details)
+               : code_(code) {
+  // If we can assure this never gets nullptr args this can be
+  // in the initializer list.
+  if(exception_class_name)
+    exception_class_ = exception_class_name;
+  if(exception_details)
+    msg_ = exception_details;
+
+  std::map<std::string, int>::const_iterator it = 
kKnownServerExceptionClasses.find(exception_class_);
+  if(it != kKnownServerExceptionClasses.end()) {
+    code_ = it->second;
+  }
+}
+
+
+Status Status::OK() {
+  return Status();
+}
+
+Status Status::InvalidArgument(const char *msg) {
+  return Status(kInvalidArgument, msg);
+}
+
+Status Status::PathNotFound(const char *msg){
+  return Status(kPathNotFound, msg);
+}
+
+Status Status::ResourceUnavailable(const char *msg) {
+  return Status(kResourceUnavailable, msg);
+}
+
+Status Status::PathIsNotDirectory(const char *msg) {
+  return Status(kNotADirectory, msg);
+}
+
+Status Status::Unimplemented() {
+  return Status(kUnimplemented, "");
+}
+
+Status Status::Exception(const char *exception_class_name, const char 
*error_message) {
+  // Server side exception but can be represented by std::errc codes
+  if (exception_class_name && (strcmp(exception_class_name, 
kStatusAccessControlException) == 0) )
+    return Status(kPermissionDenied, error_message);
+  else if (exception_class_name && (strcmp(exception_class_name, 
kStatusSaslException) == 0))
+    return AuthenticationFailed();
+  else if (exception_class_name && (strcmp(exception_class_name, 
kPathNotFoundException) == 0))
+    return Status(kPathNotFound, error_message);
+  else if (exception_class_name && (strcmp(exception_class_name, 
kPathNotFoundException2) == 0))
+    return Status(kPathNotFound, error_message);
+  else if (exception_class_name && (strcmp(exception_class_name, 
kPathIsNotDirectoryException) == 0))
+    return Status(kNotADirectory, error_message);
+  else if (exception_class_name && (strcmp(exception_class_name, 
kSnapshotException) == 0))
+    return Status(kInvalidArgument, error_message);
+  else if (exception_class_name && (strcmp(exception_class_name, 
kFileAlreadyExistsException) == 0))
+    return Status(kFileAlreadyExists, error_message);
+  else if (exception_class_name && (strcmp(exception_class_name, 
kPathIsNotEmptyDirectoryException) == 0))
+    return Status(kPathIsNotEmptyDirectory, error_message);
+  else
+    return Status(kException, exception_class_name, error_message);
+}
+
+Status Status::Error(const char *error_message) {
+  return Exception("Exception", error_message);
+}
+
+Status Status::AuthenticationFailed() {
+  return Status::AuthenticationFailed(nullptr);
+}
+
+Status Status::AuthenticationFailed(const char *msg) {
+  std::string formatted = "AuthenticationFailed";
+  if(msg) {
+    formatted += ": ";
+    formatted += msg;
+  }
+  return Status(kAuthenticationFailed, formatted.c_str());
+}
+
+Status Status::AuthorizationFailed() {
+  return Status::AuthorizationFailed(nullptr);
+}
+
+Status Status::AuthorizationFailed(const char *msg) {
+  std::string formatted = "AuthorizationFailed";
+  if(msg) {
+    formatted += ": ";
+    formatted += msg;
+  }
+  return Status(kPermissionDenied, formatted.c_str());
+}
+
+Status Status::Canceled() {
+  return Status(kOperationCanceled, "Operation canceled");
+}
+
+Status Status::InvalidOffset(const char *msg){
+  return Status(kInvalidOffset, msg);
+}
+
+std::string Status::ToString() const {
+  if (code_ == kOk) {
+    return "OK";
+  }
+  std::stringstream ss;
+  if(!exception_class_.empty()) {
+    ss << exception_class_ << ":";
+  }
+  ss << msg_;
+  return ss.str();
+}
+
+bool Status::notWorthRetry() const {
+  return noRetryExceptions.find(code_) != noRetryExceptions.end();
+}
+
+Status Status::MutexError(const char *msg) {
+  std::string formatted = "MutexError";
+  if(msg) {
+    formatted += ": ";
+    formatted += msg;
+  }
+  return Status(kBusy/*try_lock failure errno*/, msg);
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
new file mode 100644
index 0000000..9e9319b
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
@@ -0,0 +1,454 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <hdfspp/uri.h>
+
+#include <uriparser2/uriparser/Uri.h>
+
+#include <string.h>
+#include <sstream>
+#include <cstdlib>
+#include <cassert>
+#include <limits>
+
+namespace hdfs
+{
+
+///////////////////////////////////////////////////////////////////////////////
+//
+//   Internal utilities
+//
+///////////////////////////////////////////////////////////////////////////////
+
+const char kReserved[] = ":/?#[]@%+";
+
+std::string URI::encode(const std::string & decoded)
+{
+  bool hasCharactersToEncode = false;
+  for (auto c : decoded)
+  {
+    if (isalnum(c) || (strchr(kReserved, c) == NULL))
+    {
+      continue;
+    }
+    else
+    {
+      hasCharactersToEncode = true;
+      break;
+    }
+  }
+
+  if (hasCharactersToEncode)
+  {
+    std::vector<char> buf(decoded.size() * 3 + 1);
+    uriEscapeA(decoded.c_str(), &buf[0], true, URI_BR_DONT_TOUCH);
+    return std::string(&buf[0]);
+  }
+  else
+  {
+    return decoded;
+  }
+}
+
+std::string URI::decode(const std::string & encoded)
+{
+  bool hasCharactersToDecode = false;
+  for (auto c : encoded)
+  {
+    switch (c)
+    {
+    case '%':
+    case '+':
+      hasCharactersToDecode = true;
+      break;
+    default:
+      continue;
+    }
+  }
+
+  if (hasCharactersToDecode)
+  {
+    std::vector<char> buf(encoded.size() + 1);
+    strncpy(&buf[0], encoded.c_str(), buf.size());
+    uriUnescapeInPlaceExA(&buf[0], true, URI_BR_DONT_TOUCH);
+    return std::string(&buf[0]);
+  }
+  else
+  {
+    return encoded;
+  }
+}
+
+std::vector<std::string> split(const std::string input, char separator)
+{
+  std::vector<std::string> result;
+
+  if (!input.empty())
+  {
+    const char * remaining = input.c_str();
+    if (*remaining == '/')
+      remaining++;
+
+    const char * next_end = strchr(remaining, separator);
+    while (next_end) {
+      int len = next_end - remaining;
+      if (len)
+        result.push_back(std::string(remaining, len));
+      else
+        result.push_back("");
+      remaining = next_end + 1;
+      next_end = strchr(remaining, separator);
+    }
+    result.push_back(std::string(remaining));
+  }
+
+  return result;
+}
+
+
+
+///////////////////////////////////////////////////////////////////////////////
+//
+//   Parsing
+//
+///////////////////////////////////////////////////////////////////////////////
+
+
+
+std::string copy_range(const UriTextRangeA *r) {
+  const int size = r->afterLast - r->first;
+  if (size) {
+      return std::string(r->first, size);
+  }
+  return "";
+}
+
+bool parse_int(const UriTextRangeA *r, int32_t& result)
+{
+  std::string int_str = copy_range(r);
+  if(!int_str.empty()) {
+    errno = 0;
+    unsigned long val = ::strtoul(int_str.c_str(), nullptr, 10);
+    if(errno == 0 && val < std::numeric_limits<uint16_t>::max()) {
+      result = val;
+      return true;
+    } else {
+      return false;
+    }
+  }
+  return true;
+}
+
+
+std::vector<std::string> copy_path(const UriPathSegmentA *ps) {
+    std::vector<std::string> result;
+  if (nullptr == ps)
+      return result;
+
+  for (; ps != 0; ps = ps->next) {
+    result.push_back(copy_range(&ps->text));
+  }
+
+  return result;
+}
+
+void parse_user_info(const UriTextRangeA *r, std::string * user, std::string * 
pass) {
+  // Output parameters
+  assert(user);
+  assert(pass);
+
+  std::string user_and_password = copy_range(r);
+  if (!user_and_password.empty()) {
+    const char * begin = user_and_password.c_str();
+    const char * colon_loc = strchr(begin, ':');
+    if (colon_loc) {
+      *user = std::string(begin, colon_loc - begin - 1);
+      *pass = colon_loc + 1;
+    } else {
+      *user = user_and_password;
+    }
+  }
+}
+
+
+std::vector<URI::Query> parse_queries(const char *first, const char * 
afterLast) {
+    std::vector<URI::Query>  result;
+    UriQueryListA * query;
+    int count;
+    int dissect_result = uriDissectQueryMallocExA(&query, &count, first, 
afterLast, false, URI_BR_DONT_TOUCH);
+    if (URI_SUCCESS == dissect_result) {
+      for (auto ps = query; ps != nullptr; ps = ps->next) {
+        std::string key = ps->key ? URI::encode(ps->key) : "";
+        std::string value = ps->value ? URI::encode(ps->value) : "";
+          result.emplace_back(key, value);
+      }
+      uriFreeQueryListA(query);
+    }
+
+  return result;
+}
+
+// Parse a string into a URI.  Throw a hdfs::uri_parse_error if URI is 
malformed.
+URI URI::parse_from_string(const std::string &str)
+{
+  URI ret;
+  bool ok = true;
+
+  UriParserStateA state;
+  memset(&state, 0, sizeof(state));
+  UriUriA uu;
+
+  state.uri = &uu;
+  int parseResult = uriParseUriA(&state, str.c_str());
+  ok &= (parseResult == URI_SUCCESS);
+
+  if (ok) {
+    ret.scheme = copy_range(&uu.scheme);
+    ret.host = copy_range(&uu.hostText);
+    ok &= parse_int(&uu.portText, ret._port);
+    ret.path = copy_path(uu.pathHead);
+    ret.queries = parse_queries(uu.query.first, uu.query.afterLast);
+    ret.fragment = copy_range(&uu.fragment);
+    parse_user_info(&uu.userInfo, &ret.user, &ret.pass);
+    uriFreeUriMembersA(&uu);
+  }
+  uriFreeUriMembersA(&uu);
+
+  if (ok) {
+    return ret;
+  } else {
+    throw uri_parse_error(str);
+  }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+//
+//   Getters and setters
+//
+///////////////////////////////////////////////////////////////////////////////
+
+URI::URI() : _port(-1) {}
+
+URI::Query::Query(const std::string& k, const std::string& v) : key(k), 
value(v) {}
+
+std::string URI::str(bool encoded_output) const
+{
+  std::stringstream ss;
+  if (!scheme.empty()) ss << from_encoded(encoded_output, scheme) << "://";
+  if (!user.empty() || !pass.empty()) {
+    if (!user.empty()) ss << from_encoded(encoded_output, user);
+    if (!pass.empty()) ss << ":" << from_encoded(encoded_output, pass);
+    ss << "@";
+  }
+  if (has_authority()) ss << build_authority(encoded_output);
+  if (!path.empty()) ss << get_path(encoded_output);
+  if (!queries.empty()) ss << "?" << get_query(encoded_output);
+  if (!fragment.empty()) ss << "#" << from_encoded(encoded_output, fragment);
+
+  return ss.str();
+}
+
+bool URI::has_authority() const
+{
+  return (!host.empty()) || (has_port());
+}
+
+std::string URI::build_authority(bool encoded_output) const
+{
+  std::stringstream ss;
+  ss << URI::from_encoded(encoded_output, host);
+  if (has_port())
+  {
+    ss << ":" << _port;
+  }
+  return ss.str();
+}
+
+std::string URI::get_scheme(bool encoded_output) const {
+  return from_encoded(encoded_output,scheme);
+}
+
+void URI::set_scheme(const std::string &s, bool encoded_input) {
+  scheme = to_encoded(encoded_input,s);
+}
+
+std::string URI::get_host(bool encoded_output) const {
+  return from_encoded(encoded_output,host);
+}
+
+void URI::set_host(const std::string& h, bool encoded_input) {
+  host = to_encoded(encoded_input,h);
+}
+
+bool URI::has_port() const {
+  return _port != -1;
+}
+
+uint16_t URI::get_port() const {
+  return (uint16_t)_port;
+}
+
+uint16_t URI::get_port_or_default(uint16_t val) const {
+  return has_port() ? (uint16_t)_port : val;
+}
+
+void URI::set_port(uint16_t p)
+{
+  _port = (int32_t)p & 0xFFFF;
+}
+
+void URI::clear_port()
+{
+  _port = -1;
+}
+
+std::string URI::get_path(bool encoded_output) const
+{
+  std::ostringstream out;
+  for (const std::string& s: path) {
+    out << "/" << from_encoded(encoded_output, s);
+  }
+  return out.str();
+}
+
+std::vector<std::string> URI::get_path_elements(bool encoded_output) const
+{
+  std::vector<std::string> result;
+  for (const std::string& path_elem: path) {
+    result.push_back(from_encoded(encoded_output, path_elem));
+  }
+
+  return result;
+}
+
+void URI::parse_path(bool input_encoded, const std::string &input_path)
+{
+  std::vector<std::string> split_path = split(input_path, '/');
+  for (const std::string& s: split_path) {
+    path.push_back(to_encoded(input_encoded, s));
+  }
+}
+
+// Mostly copied and modified from uriparser2.c
+
+void URI::set_path(const std::string &p, bool encoded_input) {
+  parse_path(encoded_input, p);
+}
+
+void URI::add_path(const std::string &p, bool encoded_input)
+{
+  path.push_back(to_encoded(encoded_input, p));
+}
+
+std::string URI::get_query(bool encoded_output) const {
+  bool first = true;
+  std::stringstream ss;
+  for (const Query& q: queries) {
+    if (!first) {
+      ss << "&";
+    }
+    ss << from_encoded(encoded_output, q.key) << "=" << 
from_encoded(encoded_output, q.value);
+    first = false;
+  }
+
+  return ss.str();
+}
+
+std::vector<URI::Query> URI::get_query_elements(bool encoded_output) const
+{
+  std::vector<Query> result;
+  for (const Query& q: queries) {
+    std::string key = from_encoded(encoded_output, q.key);
+    std::string value = from_encoded(encoded_output, q.value);
+    result.emplace_back(key, value);
+  }
+
+  return result;
+}
+
+void URI::set_query(const std::string &q) {
+  queries = parse_queries(q.c_str(), q.c_str() + q.size() + 1);
+}
+
+
+void URI::add_query(const std::string &name, const std::string & value, bool 
encoded_input)
+{
+  queries.emplace_back(to_encoded(encoded_input, name), 
to_encoded(encoded_input, value));
+}
+
+void URI::remove_query(const std::string &q_name, bool encoded_input)
+{
+  if (queries.empty())
+    return;
+
+  // This is the one place we need to do decoded comparisons
+  std::string decoded_key = encoded_input ? decode(q_name) : q_name;
+
+  for (int i = queries.size() - 1; i >= 0; i--) {
+    if (decode(queries[i].key) == decoded_key) {
+      queries.erase(queries.begin() + i);
+    }
+  }
+}
+
+std::string URI::get_fragment(bool encoded_output) const {
+  return from_encoded(encoded_output, fragment);
+}
+
+void URI::set_fragment(const std::string &f, bool encoded_input) {
+  fragment = to_encoded(encoded_input,f);
+}
+
+std::string URI::from_encoded(bool encoded_output, const std::string & input) {
+  return encoded_output ? input : decode(input);
+}
+
+std::string URI::to_encoded(bool encoded_input, const std::string & input) {
+  return encoded_input ? input : encode(input);
+}
+
+std::string URI::GetDebugString() const {
+  std::stringstream ss;
+  ss << std::endl;
+  ss << "\t" << "uri.str() = \"" << str() << "\"" << std::endl;
+  ss << "\t" << "uri.get_scheme() = \"" << get_scheme() << "\"" << std::endl;
+  ss << "\t" << "uri.get_host() = \"" << get_host() << "\"" << std::endl;
+
+  if(_port == -1)
+    ss << "\t" << "uri.get_port() = invalid (uninitialized)" << std::endl;
+  else
+    ss << "\t" << "uri.get_port() = \"" << _port << "\"" << std::endl;
+
+  ss << "\t" << "uri.get_path() = \"" << get_path() << "\"" << std::endl;
+  ss << "\t" << "uri.get_fragment() = \"" << get_fragment() << "\"" << 
std::endl;
+
+
+  std::vector<Query> query_elems = get_query_elements();
+
+  if(query_elems.size() > 0)
+    ss << "\t" << "Query elements:" << std::endl;
+
+  for(auto qry = query_elems.begin(); qry != query_elems.end(); qry++) {
+    ss << "\t\t" << qry->key << " -> " << qry->value << std::endl;
+  }
+
+  return ss.str();
+}
+
+} // end namespace hdfs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
new file mode 100644
index 0000000..375f951
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/util.h"
+#include "common/util_c.h"
+
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
+#include <exception>
+#include <sstream>
+#include <iostream>
+#include <iomanip>
+#include <thread>
+
+
+namespace hdfs {
+
+bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in,
+                            ::google::protobuf::MessageLite *msg) {
+  uint32_t size = 0;
+  in->ReadVarint32(&size);
+  auto limit = in->PushLimit(size);
+  bool res = msg->ParseFromCodedStream(in);
+  in->PopLimit(limit);
+
+  return res;
+}
+
+
+std::string SerializeDelimitedProtobufMessage(const 
::google::protobuf::MessageLite *msg,
+                                              bool *err) {
+  namespace pbio = ::google::protobuf::io;
+
+  std::string buf;
+
+  int size = msg->ByteSize();
+  buf.reserve(pbio::CodedOutputStream::VarintSize32(size) + size);
+  pbio::StringOutputStream ss(&buf);
+  pbio::CodedOutputStream os(&ss);
+  os.WriteVarint32(size);
+
+  if(err)
+    *err = msg->SerializeToCodedStream(&os);
+
+  return buf;
+}
+
+
+std::string GetRandomClientName() {
+  std::vector<unsigned char>buf(8);
+  RAND_pseudo_bytes(&buf[0], 8);
+
+  std::ostringstream oss;
+  oss << "DFSClient_"  << getpid() <<  "_" <<
+          std::this_thread::get_id() << "_" <<
+          std::setw(2) << std::hex << std::uppercase << std::setfill('0');
+  for (unsigned char b: buf)
+    oss << static_cast<unsigned>(b);
+
+  return oss.str();
+}
+
+std::string Base64Encode(const std::string &src) {
+  //encoded size is (sizeof(buf) + 2) / 3 * 4
+  static const std::string base64_chars =
+               "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+               "abcdefghijklmnopqrstuvwxyz"
+               "0123456789+/";
+  std::string ret;
+  int i = 0;
+  int j = 0;
+  unsigned char char_array_3[3];
+  unsigned char char_array_4[4];
+  unsigned const char *bytes_to_encode = reinterpret_cast<unsigned const char 
*>(&src[i]);
+  unsigned int in_len = src.size();
+
+  while (in_len--) {
+    char_array_3[i++] = *(bytes_to_encode++);
+    if (i == 3) {
+      char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
+      char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 
0xf0) >> 4);
+      char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 
0xc0) >> 6);
+      char_array_4[3] = char_array_3[2] & 0x3f;
+
+      for(i = 0; (i <4) ; i++)
+        ret += base64_chars[char_array_4[i]];
+      i = 0;
+    }
+  }
+
+  if (i)  {
+    for(j = i; j < 3; j++)
+      char_array_3[j] = '\0';
+
+    char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
+    char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 
0xf0) >> 4);
+    char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 
0xc0) >> 6);
+    char_array_4[3] = char_array_3[2] & 0x3f;
+
+    for (j = 0; (j < i + 1); j++)
+      ret += base64_chars[char_array_4[j]];
+
+    while((i++ < 3))
+      ret += '=';
+  }
+  return ret;
+}
+
+
+std::string SafeDisconnect(asio::ip::tcp::socket *sock) {
+  std::string err;
+  if(sock && sock->is_open()) {
+    /**
+     *  Even though we just checked that the socket is open it's possible
+     *  it isn't in a state where it can properly send or receive.  If that's
+     *  the case asio will turn the underlying error codes from shutdown()
+     *  and close() into unhelpfully named std::exceptions.  Due to the
+     *  relatively innocuous nature of most of these error codes it's better
+     *  to just catch and return a flag so the caller can log failure.
+     **/
+
+    try {
+      sock->shutdown(asio::ip::tcp::socket::shutdown_both);
+    } catch (const std::exception &e) {
+      err = std::string("shutdown() threw") + e.what();
+    }
+
+    try {
+      sock->close();
+    } catch (const std::exception &e) {
+      // don't append if shutdown() already failed, first failure is the 
useful one
+      if(err.empty())
+        err = std::string("close() threw") + e.what();
+    }
+
+  }
+  return err;
+}
+
+bool IsHighBitSet(uint64_t num) {
+  uint64_t firstBit = (uint64_t) 1 << 63;
+  if (num & firstBit) {
+    return true;
+  } else {
+    return false;
+  }
+}
+
+}
+
+void ShutdownProtobufLibrary_C() {
+  google::protobuf::ShutdownProtobufLibrary();
+}
+
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
new file mode 100644
index 0000000..be902bd
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_COMMON_UTIL_H_
+#define LIB_COMMON_UTIL_H_
+
+#include "hdfspp/status.h"
+#include "common/logging.h"
+
+#include <sstream>
+#include <mutex>
+#include <string>
+
+#include <asio/error_code.hpp>
+#include <openssl/rand.h>
+
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <asio.hpp>
+
+namespace hdfs {
+
+// typedefs based on code that's repeated everywhere
+typedef std::lock_guard<std::mutex> mutex_guard;
+
+
+static inline Status ToStatus(const ::asio::error_code &ec) {
+  if (ec) {
+    return Status(ec.value(), ec.message().c_str());
+  } else {
+    return Status::OK();
+  }
+}
+
+// Determine size of buffer that needs to be allocated in order to serialize 
msg
+// in delimited format
+static inline int DelimitedPBMessageSize(const ::google::protobuf::MessageLite 
*msg) {
+  size_t size = msg->ByteSize();
+  return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size;
+}
+
+// Construct msg from the input held in the CodedInputStream
+// return false on failure, otherwise return true
+bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in,
+                            ::google::protobuf::MessageLite *msg);
+
+// Serialize msg into a delimited form (java protobuf compatible)
+// err, if not null, will be set to false on failure
+std::string SerializeDelimitedProtobufMessage(const 
::google::protobuf::MessageLite *msg,
+                                              bool *err);
+
+std::string Base64Encode(const std::string &src);
+
+// Return a new high-entropy client name
+std::string GetRandomClientName();
+
+// Returns true if _someone_ is holding the lock (not necessarily this thread,
+// but a std::mutex doesn't track which thread is holding the lock)
+template<class T>
+bool lock_held(T & mutex) {
+  bool result = !mutex.try_lock();
+  if (!result)
+    mutex.unlock();
+  return result;
+}
+
+// Shutdown and close a socket safely; will check if the socket is open and
+// catch anything thrown by asio.
+// Returns a string containing error message on failure, otherwise an empty 
string.
+std::string SafeDisconnect(asio::ip::tcp::socket *sock);
+
+
+
+// The following helper function is used for classes that look like the 
following:
+//
+// template <typename socket_like_object>
+// class ObjectThatHoldsSocket {
+//   socket_like_object sock_;
+//   void DoSomethingWithAsioTcpSocket();
+// }
+//
+// The trick here is that ObjectThatHoldsSocket may be templated on a mock 
socket
+// in mock tests.  If you have a method that explicitly needs to call some asio
+// method unrelated to the mock test you need a way of making sure 
socket_like_object
+// is, in fact, an asio::ip::tcp::socket.  Otherwise the mocks need to 
implement
+// lots of no-op boilerplate.  This will return the value of the input param if
+// it's a asio socket, and nullptr if it's anything else.
+
+template <typename sock_t>
+inline asio::ip::tcp::socket *get_asio_socket_ptr(sock_t *s) {
+  (void)s;
+  return nullptr;
+}
+template<>
+inline asio::ip::tcp::socket *get_asio_socket_ptr<asio::ip::tcp::socket>
+                                            (asio::ip::tcp::socket *s) {
+  return s;
+}
+
+//Check if the high bit is set
+bool IsHighBitSet(uint64_t num);
+
+
+// Provide a way to do an atomic swap on a callback.
+// SetCallback, AtomicSwapCallback, and GetCallback can only be called once 
each.
+// AtomicSwapCallback and GetCallback must only be called after SetCallback.
+//
+// We can't throw on error, and since the callback is templated it's tricky to
+// generate generic dummy callbacks.  Complain loudly in the log and get good
+// test coverage.  It shouldn't be too hard to avoid invalid states.
+template <typename CallbackType>
+class SwappableCallbackHolder {
+ private:
+  std::mutex state_lock_;
+  CallbackType callback_;
+  bool callback_set_      = false;
+  bool callback_swapped_  = false;
+  bool callback_accessed_ = false;
+ public:
+  bool IsCallbackSet() {
+    mutex_guard swap_lock(state_lock_);
+    return callback_set_;
+  }
+
+  bool IsCallbackAccessed() {
+    mutex_guard swap_lock(state_lock_);
+    return callback_accessed_;
+  }
+
+  bool SetCallback(const CallbackType& callback) {
+    mutex_guard swap_lock(state_lock_);
+    if(callback_set_ || callback_swapped_ || callback_accessed_) {
+      LOG_ERROR(kAsyncRuntime, << "SetCallback violates access invariants.")
+      return false;
+    }
+    callback_ = callback;
+    callback_set_ = true;
+    return true;
+  }
+
+  CallbackType AtomicSwapCallback(const CallbackType& replacement, bool& 
swapped) {
+    mutex_guard swap_lock(state_lock_);
+    if(!callback_set_ || callback_swapped_) {
+      LOG_ERROR(kAsyncRuntime, << "AtomicSwapCallback violates access 
invariants.")
+      swapped = false;
+    } else if (callback_accessed_) {
+      // Common case where callback has been invoked but caller may not know
+      LOG_DEBUG(kAsyncRuntime, << "AtomicSwapCallback called after callback 
has been accessed");
+      return callback_;
+    }
+
+    CallbackType old = callback_;
+    callback_ = replacement;
+    callback_swapped_ = true;
+    swapped = true;
+    return old;
+  }
+  CallbackType GetCallback() {
+    mutex_guard swap_lock(state_lock_);
+    if(!callback_set_ || callback_accessed_) {
+      LOG_ERROR(kAsyncRuntime, << "GetCallback violates access invariants.")
+    }
+    callback_accessed_ = true;
+    return callback_;
+  }
+};
+
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util_c.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util_c.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util_c.h
new file mode 100644
index 0000000..c7db7d2
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util_c.h
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_COMMON_UTIL_C_H_
+#define LIB_COMMON_UTIL_C_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+  void ShutdownProtobufLibrary_C();
+
+#ifdef __cplusplus
+} /* end extern "C" */
+#endif
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt
new file mode 100644
index 0000000..fda51a1
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+add_library(connection_obj OBJECT datanodeconnection.cc)
+add_dependencies(connection_obj proto)
+add_library(connection $<TARGET_OBJECTS:connection_obj>)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
new file mode 100644
index 0000000..27cd666
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "datanodeconnection.h"
+#include "common/util.h"
+
+namespace hdfs {
+
+DataNodeConnection::~DataNodeConnection(){}
+DataNodeConnectionImpl::~DataNodeConnectionImpl(){}
+
+DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service,
+                                                const 
::hadoop::hdfs::DatanodeInfoProto &dn_proto,
+                                                const 
hadoop::common::TokenProto *token,
+                                                LibhdfsEvents *event_handlers) 
: event_handlers_(event_handlers)
+{
+  using namespace ::asio::ip;
+
+  conn_.reset(new tcp::socket(*io_service));
+  auto datanode_addr = dn_proto.id();
+  endpoints_[0] = tcp::endpoint(address::from_string(datanode_addr.ipaddr()),
+                                  datanode_addr.xferport());
+  uuid_ = dn_proto.id().datanodeuuid();
+
+  if (token) {
+    token_.reset(new hadoop::common::TokenProto());
+    token_->CheckTypeAndMergeFrom(*token);
+  }
+}
+
+
+void DataNodeConnectionImpl::Connect(
+             std::function<void(Status status, 
std::shared_ptr<DataNodeConnection> dn)> handler) {
+  // Keep the DN from being freed until we're done
+  mutex_guard state_lock(state_lock_);
+  auto shared_this = shared_from_this();
+  asio::async_connect(*conn_, endpoints_.begin(), endpoints_.end(),
+          [shared_this, handler](const asio::error_code &ec, 
std::array<asio::ip::tcp::endpoint, 1>::iterator it) {
+            (void)it;
+            handler(ToStatus(ec), shared_this); });
+}
+
+void DataNodeConnectionImpl::Cancel() {
+  std::string err;
+
+  { // scope the lock for disconnect only, log has it's own lock
+    mutex_guard state_lock(state_lock_);
+    err = SafeDisconnect(conn_.get());
+  }
+
+  if(!err.empty()) {
+    LOG_WARN(kBlockReader, << "Error disconnecting socket in 
DataNodeConnectionImpl::Cancel, " << err);
+  }
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
new file mode 100644
index 0000000..21193b3
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
+#define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
+
+#include "common/hdfs_ioservice.h"
+#include "common/async_stream.h"
+#include "ClientNamenodeProtocol.pb.h"
+#include "common/libhdfs_events_impl.h"
+#include "common/logging.h"
+#include "common/util.h"
+#include "common/new_delete.h"
+
+#include "asio.hpp"
+
+namespace hdfs {
+
+class DataNodeConnection : public AsyncStream {
+public:
+  MEMCHECKED_CLASS(DataNodeConnection)
+  std::string uuid_;
+  std::unique_ptr<hadoop::common::TokenProto> token_;
+
+  virtual ~DataNodeConnection();
+  virtual void Connect(std::function<void(Status status, 
std::shared_ptr<DataNodeConnection> dn)> handler) = 0;
+  virtual void Cancel() = 0;
+};
+
+
+struct SocketDeleter {
+  inline void operator()(asio::ip::tcp::socket *sock) {
+    // Cancel may have already closed the socket.
+    std::string err = SafeDisconnect(sock);
+    if(!err.empty()) {
+        LOG_WARN(kBlockReader, << "Error disconnecting socket: " << err);
+    }
+    delete sock;
+  }
+};
+
+class DataNodeConnectionImpl : public DataNodeConnection, public 
std::enable_shared_from_this<DataNodeConnectionImpl>{
+private:
+  // held (briefly) while posting async ops to the asio task queue
+  std::mutex state_lock_;
+public:
+  std::unique_ptr<asio::ip::tcp::socket, SocketDeleter> conn_;
+  std::array<asio::ip::tcp::endpoint, 1> endpoints_;
+  std::string uuid_;
+  LibhdfsEvents *event_handlers_;
+
+  virtual ~DataNodeConnectionImpl();
+  DataNodeConnectionImpl(asio::io_service * io_service, const 
::hadoop::hdfs::DatanodeInfoProto &dn_proto,
+                          const hadoop::common::TokenProto *token,
+                          LibhdfsEvents *event_handlers);
+
+  void Connect(std::function<void(Status status, 
std::shared_ptr<DataNodeConnection> dn)> handler) override;
+
+  void Cancel() override;
+
+  void async_read_some(const MutableBuffers &buf,
+                         std::function<void (const asio::error_code & error, 
std::size_t bytes_transferred) > handler)
+                       override {
+    event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin());
+
+
+    mutex_guard state_lock(state_lock_);
+    conn_->async_read_some(buf, handler);
+  };
+
+  void async_write_some(const ConstBuffers &buf,
+                          std::function<void (const asio::error_code & error, 
std::size_t bytes_transferred) > handler)
+                        override {
+    event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin());
+
+    mutex_guard state_lock(state_lock_);
+    conn_->async_write_some(buf, handler);
+  }
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
new file mode 100644
index 0000000..624cda5
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+add_library(fs_obj OBJECT filesystem.cc filesystem_sync.cc filehandle.cc 
bad_datanode_tracker.cc namenode_operations.cc)
+add_dependencies(fs_obj proto)
+add_library(fs $<TARGET_OBJECTS:fs_obj>)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.cc
new file mode 100644
index 0000000..be9020f
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.cc
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "bad_datanode_tracker.h"
+
+namespace hdfs {
+
+NodeExclusionRule::~NodeExclusionRule() {}
+
+BadDataNodeTracker::BadDataNodeTracker(const Options& options)
+    : timeout_duration_(options.host_exclusion_duration),
+      test_clock_shift_(0) {}
+
+BadDataNodeTracker::~BadDataNodeTracker() {}
+
+void BadDataNodeTracker::AddBadNode(const std::string& dn) {
+  std::lock_guard<std::mutex> update_lock(datanodes_update_lock_);
+  datanodes_[dn] = Clock::now();
+}
+
+bool BadDataNodeTracker::IsBadNode(const std::string& dn) {
+  std::lock_guard<std::mutex> update_lock(datanodes_update_lock_);
+
+  if (datanodes_.count(dn) == 1) {
+    const TimePoint& entered_time = datanodes_[dn];
+    if (TimeoutExpired(entered_time)) {
+      datanodes_.erase(dn);
+      return false;
+    }
+    /* node in set and still marked bad */
+    return true;
+  }
+  return false;
+}
+
+void BadDataNodeTracker::TEST_set_clock_shift(int t) { test_clock_shift_ = t; }
+
+bool BadDataNodeTracker::TimeoutExpired(const TimePoint& t) {
+  TimePoint threshold = Clock::now() -
+                        std::chrono::milliseconds(timeout_duration_) +
+                        std::chrono::milliseconds(test_clock_shift_);
+  if (t < threshold) {
+    return true;
+  }
+  return false;
+}
+
+ExclusionSet::ExclusionSet(const std::set<std::string>& excluded)
+    : excluded_(excluded) {}
+
+ExclusionSet::~ExclusionSet() {}
+
+bool ExclusionSet::IsBadNode(const std::string& node_uuid) {
+  return excluded_.count(node_uuid) == 1;
+}
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to