http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc
new file mode 100644
index 0000000..53c9e26
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc
@@ -0,0 +1,607 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+
+#include <future>
+#include <tuple>
+
+#define FMT_THIS_ADDR "this=" << (void*)this
+
+// Note: This is just a place to hold boilerplate async to sync shim code,
+//       place actual filesystem logic in filesystem.cc
+//
+//
+// Shim pattern pseudocode
+//
+// Status MySynchronizedMethod(method_args):
+//  let stat = a promise<Status> wrapped in a shared_ptr
+//
+//  Create a lambda that captures stat and any other variables that need to
+//  be set based on the async operation.  When invoked set variables with the
+//  arguments passed (possibly do some translation), then set stat to indicate
+//  the return status of the async call.
+//
+//  invoke MyAsyncMethod(method_args, handler_lambda)
+//
+//  block until stat value has been set while async work takes place
+//
+//  return stat
+
+namespace hdfs {
+
+Status FileSystemImpl::Connect(const std::string &server, const std::string 
&service) {
+  LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Connect(" << FMT_THIS_ADDR
+                        << ", server=" << server << ", service=" << service << 
") called");
+
+  /* synchronized */
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future = stat->get_future();
+
+  auto callback = [stat](const Status &s, FileSystem *fs) {
+    (void)fs;
+    stat->set_value(s);
+  };
+
+  Connect(server, service, callback);
+
+  /* block until promise is set */
+  auto s = future.get();
+
+  return s;
+}
+
+
+Status FileSystemImpl::ConnectToDefaultFs() {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future = stat->get_future();
+
+  auto callback = [stat](const Status &s, FileSystem *fs) {
+    (void)fs;
+    stat->set_value(s);
+  };
+
+  ConnectToDefaultFs(callback);
+
+  /* block until promise is set */
+  auto s = future.get();
+
+  return s;
+}
+
+
+Status FileSystemImpl::Open(const std::string &path,
+                                         FileHandle **handle) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Open("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, 
FileHandle*>>>();
+  std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future());
+
+  /* wrap async FileSystem::Open with promise to make it a blocking call */
+  auto h = [callstate](const Status &s, FileHandle *is) {
+    callstate->set_value(std::make_tuple(s, is));
+  };
+
+  Open(path, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = std::get<0>(returnstate);
+  FileHandle *file_handle = std::get<1>(returnstate);
+
+  if (!stat.ok()) {
+    delete file_handle;
+    return stat;
+  }
+  if (!file_handle) {
+    return stat;
+  }
+
+  *handle = file_handle;
+  return stat;
+}
+
+Status FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t 
offset, uint64_t length,
+  std::shared_ptr<FileBlockLocation> * fileBlockLocations)
+{
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  if (!fileBlockLocations)
+    return Status::InvalidArgument("Null pointer passed to GetBlockLocations");
+
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, 
std::shared_ptr<FileBlockLocation>>>>();
+  std::future<std::tuple<Status, std::shared_ptr<FileBlockLocation>>> 
future(callstate->get_future());
+
+  /* wrap async call with promise/future to make it blocking */
+  auto callback = [callstate](const Status &s, 
std::shared_ptr<FileBlockLocation> blockInfo) {
+    callstate->set_value(std::make_tuple(s,blockInfo));
+  };
+
+  GetBlockLocations(path, offset, length, callback);
+
+  /* wait for async to finish */
+  auto returnstate = future.get();
+  auto stat = std::get<0>(returnstate);
+
+  if (!stat.ok()) {
+    return stat;
+  }
+
+  *fileBlockLocations = std::get<1>(returnstate);
+
+  return stat;
+}
+
+Status FileSystemImpl::GetPreferredBlockSize(const std::string &path, uint64_t 
& block_size) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetPreferredBlockSize("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, 
uint64_t>>>();
+  std::future<std::tuple<Status, uint64_t>> future(callstate->get_future());
+
+  /* wrap async FileSystem::GetPreferredBlockSize with promise to make it a 
blocking call */
+  auto h = [callstate](const Status &s, const uint64_t & bsize) {
+    callstate->set_value(std::make_tuple(s, bsize));
+  };
+
+  GetPreferredBlockSize(path, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = std::get<0>(returnstate);
+  uint64_t size = std::get<1>(returnstate);
+
+  if (!stat.ok()) {
+    return stat;
+  }
+
+  block_size = size;
+  return stat;
+}
+
+Status FileSystemImpl::SetReplication(const std::string & path, int16_t 
replication) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]SetReplication(" << FMT_THIS_ADDR << ", path=" 
<< path <<
+      ", replication=" << replication << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::SetReplication with promise to make it a blocking 
call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  SetReplication(path, replication, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, 
uint64_t atime) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]SetTimes(" << FMT_THIS_ADDR << ", path=" << 
path <<
+      ", mtime=" << mtime << ", atime=" << atime << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::SetTimes with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  SetTimes(path, mtime, atime, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::GetFileInfo(const std::string &path,
+                                         StatInfo & stat_info) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetFileInfo("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, 
StatInfo>>>();
+  std::future<std::tuple<Status, StatInfo>> future(callstate->get_future());
+
+  /* wrap async FileSystem::GetFileInfo with promise to make it a blocking 
call */
+  auto h = [callstate](const Status &s, const StatInfo &si) {
+    callstate->set_value(std::make_tuple(s, si));
+  };
+
+  GetFileInfo(path, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = std::get<0>(returnstate);
+  StatInfo info = std::get<1>(returnstate);
+
+  if (!stat.ok()) {
+    return stat;
+  }
+
+  stat_info = info;
+  return stat;
+}
+
+Status FileSystemImpl::GetContentSummary(const std::string &path,
+                                         ContentSummary & content_summary) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetContentSummary("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, 
ContentSummary>>>();
+  std::future<std::tuple<Status, ContentSummary>> 
future(callstate->get_future());
+
+  /* wrap async FileSystem::GetContentSummary with promise to make it a 
blocking call */
+  auto h = [callstate](const Status &s, const ContentSummary &si) {
+    callstate->set_value(std::make_tuple(s, si));
+  };
+
+  GetContentSummary(path, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = std::get<0>(returnstate);
+  ContentSummary cs = std::get<1>(returnstate);
+
+  if (!stat.ok()) {
+    return stat;
+  }
+
+  content_summary = cs;
+  return stat;
+}
+
+Status FileSystemImpl::GetFsStats(FsInfo & fs_info) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]GetFsStats(" << FMT_THIS_ADDR << ") called");
+
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, 
FsInfo>>>();
+  std::future<std::tuple<Status, FsInfo>> future(callstate->get_future());
+
+  /* wrap async FileSystem::GetFsStats with promise to make it a blocking call 
*/
+  auto h = [callstate](const Status &s, const FsInfo &si) {
+    callstate->set_value(std::make_tuple(s, si));
+  };
+
+  GetFsStats(h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = std::get<0>(returnstate);
+  FsInfo info = std::get<1>(returnstate);
+
+  if (!stat.ok()) {
+    return stat;
+  }
+
+  fs_info = info;
+  return stat;
+}
+
+Status FileSystemImpl::GetListing(const std::string &path, 
std::vector<StatInfo> * stat_infos) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetListing("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  if (!stat_infos) {
+    return Status::InvalidArgument("FileSystemImpl::GetListing: argument 
'stat_infos' cannot be NULL");
+  }
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::GetListing with promise to make it a blocking call.
+   *
+     Keep requesting more until we get the entire listing, and don't set the 
promise
+   * until we have the entire listing.
+   */
+  auto h = [callstate, stat_infos](const Status &s, const 
std::vector<StatInfo> & si, bool has_more) -> bool {
+    if (!si.empty()) {
+      stat_infos->insert(stat_infos->end(), si.begin(), si.end());
+    }
+
+    bool done = !s.ok() || !has_more;
+    if (done) {
+      callstate->set_value(s);
+      return false;
+    }
+    return true;
+  };
+
+  GetListing(path, h);
+
+  /* block until promise is set */
+  Status stat = future.get();
+
+  return stat;
+}
+
+Status FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, 
bool createparent) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]Mkdirs(" << FMT_THIS_ADDR << ", path=" << path 
<<
+      ", permissions=" << permissions << ", createparent=" << createparent << 
") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::Mkdirs with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  Mkdirs(path, permissions, createparent, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::Delete(const std::string &path, bool recursive) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]Delete(" << FMT_THIS_ADDR << ", path=" << path 
<< ", recursive=" << recursive << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::Delete with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  Delete(path, recursive, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::Rename(const std::string &oldPath, const std::string 
&newPath) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]Rename(" << FMT_THIS_ADDR << ", oldPath=" << 
oldPath << ", newPath=" << newPath << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::Rename with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  Rename(oldPath, newPath, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::SetPermission(const std::string & path, uint16_t 
permissions) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]SetPermission(" << FMT_THIS_ADDR << ", path=" 
<< path << ", permissions=" << permissions << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::SetPermission with promise to make it a blocking 
call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  SetPermission(path, permissions, h);
+
+  /* block until promise is set */
+  Status stat = future.get();
+
+  return stat;
+}
+
+Status FileSystemImpl::SetOwner(const std::string & path, const std::string & 
username,
+                                const std::string & groupname) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]SetOwner(" << FMT_THIS_ADDR << ", path=" << 
path << ", username=" << username << ", groupname=" << groupname << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::SetOwner with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  SetOwner(path, username, groupname, h);
+
+  /* block until promise is set */
+  Status stat = future.get();
+  return stat;
+}
+
+Status FileSystemImpl::Find(const std::string &path, const std::string &name, 
const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Find("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ", name="
+                                 << name << ") called");
+
+  if (!stat_infos) {
+    return Status::InvalidArgument("FileSystemImpl::Find: argument 
'stat_infos' cannot be NULL");
+  }
+
+  // In this case, we're going to have the async code populate stat_infos.
+
+  std::promise<void> promise = std::promise<void>();
+  std::future<void> future(promise.get_future());
+  Status status = Status::OK();
+
+  /**
+    * Keep requesting more until we get the entire listing. Set the promise
+    * when we have the entire listing to stop.
+    *
+    * Find guarantees that the handler will only be called once at a time,
+    * so we do not need any locking here
+    */
+  auto h = [&status, &promise, stat_infos](const Status &s, const 
std::vector<StatInfo> & si, bool has_more_results) -> bool {
+    if (!si.empty()) {
+      stat_infos->insert(stat_infos->end(), si.begin(), si.end());
+    }
+    if (!s.ok() && status.ok()){
+      //We make sure we set 'status' only on the first error.
+      status = s;
+    }
+    if (!has_more_results) {
+      promise.set_value();
+      return false;
+    }
+    return true;
+  };
+
+  Find(path, name, maxdepth, h);
+
+  /* block until promise is set */
+  future.get();
+  return status;
+}
+
+Status FileSystemImpl::CreateSnapshot(const std::string &path,
+    const std::string &name) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]CreateSnapshot(" << FMT_THIS_ADDR << ", path=" 
<< path << ", name=" << name << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::CreateSnapshot with promise to make it a blocking 
call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  CreateSnapshot(path, name, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::DeleteSnapshot(const std::string &path,
+    const std::string &name) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" 
<< path << ", name=" << name << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::DeleteSnapshot with promise to make it a blocking 
call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  DeleteSnapshot(path, name, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::RenameSnapshot(const std::string &path,
+    const std::string &old_name, const std::string &new_name) {
+  LOG_DEBUG(kFileSystem,
+    << "FileSystemImpl::[sync]RenameSnapshot(" << FMT_THIS_ADDR << ", path=" 
<< path <<
+    ", old_name=" << old_name << ", new_name=" << new_name << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::RenameSnapshot with promise to make it a blocking 
call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  RenameSnapshot(path, old_name, new_name, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::AllowSnapshot(const std::string &path) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]AllowSnapshot(" << FMT_THIS_ADDR << ", path=" 
<< path << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::AllowSnapshot with promise to make it a blocking 
call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  AllowSnapshot(path, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::DisallowSnapshot(const std::string &path) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]DisallowSnapshot(" << FMT_THIS_ADDR << ", 
path=" << path << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::DisallowSnapshot with promise to make it a 
blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  DisallowSnapshot(path, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc
new file mode 100644
index 0000000..e46faad
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc
@@ -0,0 +1,727 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+#include "common/continuation/asio.h"
+
+#include <asio/ip/tcp.hpp>
+
+#include <functional>
+#include <limits>
+#include <future>
+#include <tuple>
+#include <iostream>
+#include <pwd.h>
+#include <utility>
+
+#define FMT_THIS_ADDR "this=" << (void*)this
+
+using ::asio::ip::tcp;
+
+namespace hdfs {
+
+/*****************************************************************************
+ *                    NAMENODE OPERATIONS
+ ****************************************************************************/
+
+void NameNodeOperations::Connect(const std::string &cluster_name,
+                                 const std::vector<ResolvedNamenodeInfo> 
&servers,
+                                 std::function<void(const Status &)> 
&&handler) {
+  engine_->Connect(cluster_name, servers, handler);
+}
+
+bool NameNodeOperations::CancelPendingConnect() {
+  return engine_->CancelPendingConnect();
+}
+
+void NameNodeOperations::GetBlockLocations(const std::string & path, uint64_t 
offset, uint64_t length,
+  std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> 
handler)
+{
+  using ::hadoop::hdfs::GetBlockLocationsRequestProto;
+  using ::hadoop::hdfs::GetBlockLocationsResponseProto;
+
+  LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations("
+                           << FMT_THIS_ADDR << ", path=" << path << ", ...) 
called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("GetBlockLocations: argument 'path' cannot 
be empty"), nullptr);
+    return;
+  }
+
+  //Protobuf gives an error 'Negative value is not supported'
+  //if the high bit is set in uint64 in GetBlockLocations
+  if (IsHighBitSet(offset)) {
+    handler(Status::InvalidArgument("GetBlockLocations: argument 'offset' 
cannot have high bit set"), nullptr);
+    return;
+  }
+  if (IsHighBitSet(length)) {
+    handler(Status::InvalidArgument("GetBlockLocations: argument 'length' 
cannot have high bit set"), nullptr);
+    return;
+  }
+
+  GetBlockLocationsRequestProto req;
+  req.set_src(path);
+  req.set_offset(offset);
+  req.set_length(length);
+
+  auto resp = std::make_shared<GetBlockLocationsResponseProto>();
+
+  namenode_.GetBlockLocations(&req, resp, [resp, handler](const Status &stat) {
+    if (stat.ok()) {
+      auto file_info = std::make_shared<struct FileInfo>();
+      auto locations = resp->locations();
+
+      file_info->file_length_ = locations.filelength();
+      file_info->last_block_complete_ = locations.islastblockcomplete();
+      file_info->under_construction_ = locations.underconstruction();
+
+      for (const auto &block : locations.blocks()) {
+        file_info->blocks_.push_back(block);
+      }
+
+      if (!locations.islastblockcomplete() &&
+          locations.has_lastblock() && locations.lastblock().b().numbytes()) {
+        file_info->blocks_.push_back(locations.lastblock());
+        file_info->file_length_ += locations.lastblock().b().numbytes();
+      }
+
+      handler(stat, file_info);
+    } else {
+      handler(stat, nullptr);
+    }
+  });
+}
+
+void NameNodeOperations::GetPreferredBlockSize(const std::string & path,
+  std::function<void(const Status &, const uint64_t)> handler)
+{
+  using ::hadoop::hdfs::GetPreferredBlockSizeRequestProto;
+  using ::hadoop::hdfs::GetPreferredBlockSizeResponseProto;
+
+  LOG_TRACE(kFileSystem, << "NameNodeOperations::GetPreferredBlockSize("
+                           << FMT_THIS_ADDR << ", path=" << path << ") 
called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("GetPreferredBlockSize: argument 'path' 
cannot be empty"), -1);
+    return;
+  }
+
+  GetPreferredBlockSizeRequestProto req;
+  req.set_filename(path);
+
+  auto resp = std::make_shared<GetPreferredBlockSizeResponseProto>();
+
+  namenode_.GetPreferredBlockSize(&req, resp, [resp, handler, path](const 
Status &stat) {
+    if (stat.ok() && resp -> has_bsize()) {
+      uint64_t block_size = resp -> bsize();
+      handler(stat, block_size);
+    } else {
+      handler(stat, -1);
+    }
+  });
+}
+
+void NameNodeOperations::SetReplication(const std::string & path, int16_t 
replication,
+  std::function<void(const Status &)> handler)
+{
+  using ::hadoop::hdfs::SetReplicationRequestProto;
+  using ::hadoop::hdfs::SetReplicationResponseProto;
+
+  LOG_TRACE(kFileSystem,
+      << "NameNodeOperations::SetReplication(" << FMT_THIS_ADDR << ", path=" 
<< path <<
+      ", replication=" << replication << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be 
empty"));
+    return;
+  }
+  Status replStatus = FileSystemImpl::CheckValidReplication(replication);
+  if (!replStatus.ok()) {
+    handler(replStatus);
+    return;
+  }
+  SetReplicationRequestProto req;
+  req.set_src(path);
+  req.set_replication(replication);
+
+  auto resp = std::make_shared<SetReplicationResponseProto>();
+
+  namenode_.SetReplication(&req, resp, [resp, handler, path](const Status 
&stat) {
+    if (stat.ok()) {
+      // Checking resp
+      if(resp -> has_result() && resp ->result() == 1) {
+        handler(stat);
+      } else {
+        //NameNode does not specify why there is no result, in my testing it 
was happening when the path is not found
+        std::string errormsg = "No such file or directory: " + path;
+        Status statNew = Status::PathNotFound(errormsg.c_str());
+        handler(statNew);
+      }
+    } else {
+      handler(stat);
+    }
+  });
+}
+
+void NameNodeOperations::SetTimes(const std::string & path, uint64_t mtime, 
uint64_t atime,
+  std::function<void(const Status &)> handler)
+{
+  using ::hadoop::hdfs::SetTimesRequestProto;
+  using ::hadoop::hdfs::SetTimesResponseProto;
+
+  LOG_TRACE(kFileSystem,
+      << "NameNodeOperations::SetTimes(" << FMT_THIS_ADDR << ", path=" << path 
<<
+      ", mtime=" << mtime << ", atime=" << atime << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("SetTimes: argument 'path' cannot be 
empty"));
+    return;
+  }
+
+  SetTimesRequestProto req;
+  req.set_src(path);
+  req.set_mtime(mtime);
+  req.set_atime(atime);
+
+  auto resp = std::make_shared<SetTimesResponseProto>();
+
+  namenode_.SetTimes(&req, resp, [resp, handler, path](const Status &stat) {
+      handler(stat);
+  });
+}
+
+
+
+void NameNodeOperations::GetFileInfo(const std::string & path,
+  std::function<void(const Status &, const StatInfo &)> handler)
+{
+  using ::hadoop::hdfs::GetFileInfoRequestProto;
+  using ::hadoop::hdfs::GetFileInfoResponseProto;
+
+  LOG_TRACE(kFileSystem, << "NameNodeOperations::GetFileInfo("
+                           << FMT_THIS_ADDR << ", path=" << path << ") 
called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("GetFileInfo: argument 'path' cannot be 
empty"), StatInfo());
+    return;
+  }
+
+  GetFileInfoRequestProto req;
+  req.set_src(path);
+
+  auto resp = std::make_shared<GetFileInfoResponseProto>();
+
+  namenode_.GetFileInfo(&req, resp, [resp, handler, path](const Status &stat) {
+    if (stat.ok()) {
+      // For non-existant files, the server will respond with an OK message but
+      //   no fs in the protobuf.
+      if(resp -> has_fs()){
+          struct StatInfo stat_info;
+          stat_info.path = path;
+          stat_info.full_path = path;
+          HdfsFileStatusProtoToStatInfo(stat_info, resp->fs());
+          handler(stat, stat_info);
+        } else {
+          std::string errormsg = "No such file or directory: " + path;
+          Status statNew = Status::PathNotFound(errormsg.c_str());
+          handler(statNew, StatInfo());
+        }
+    } else {
+      handler(stat, StatInfo());
+    }
+  });
+}
+
+void NameNodeOperations::GetContentSummary(const std::string & path,
+  std::function<void(const Status &, const ContentSummary &)> handler)
+{
+  using ::hadoop::hdfs::GetContentSummaryRequestProto;
+  using ::hadoop::hdfs::GetContentSummaryResponseProto;
+
+  LOG_TRACE(kFileSystem, << "NameNodeOperations::GetContentSummary("
+                           << FMT_THIS_ADDR << ", path=" << path << ") 
called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("GetContentSummary: argument 'path' cannot 
be empty"), ContentSummary());
+    return;
+  }
+
+  GetContentSummaryRequestProto req;
+  req.set_path(path);
+
+  auto resp = std::make_shared<GetContentSummaryResponseProto>();
+
+  namenode_.GetContentSummary(&req, resp, [resp, handler, path](const Status 
&stat) {
+    if (stat.ok()) {
+      // For non-existant files, the server will respond with an OK message but
+      //   no summary in the protobuf.
+      if(resp -> has_summary()){
+          struct ContentSummary content_summary;
+          content_summary.path = path;
+          ContentSummaryProtoToContentSummary(content_summary, 
resp->summary());
+          handler(stat, content_summary);
+        } else {
+          std::string errormsg = "No such file or directory: " + path;
+          Status statNew = Status::PathNotFound(errormsg.c_str());
+          handler(statNew, ContentSummary());
+        }
+    } else {
+      handler(stat, ContentSummary());
+    }
+  });
+}
+
+void NameNodeOperations::GetFsStats(
+    std::function<void(const Status &, const FsInfo &)> handler) {
+  using ::hadoop::hdfs::GetFsStatusRequestProto;
+  using ::hadoop::hdfs::GetFsStatsResponseProto;
+
+  LOG_TRACE(kFileSystem,
+      << "NameNodeOperations::GetFsStats(" << FMT_THIS_ADDR << ") called");
+
+  GetFsStatusRequestProto req;
+  auto resp = std::make_shared<GetFsStatsResponseProto>();
+
+  namenode_.GetFsStats(&req, resp, [resp, handler](const Status &stat) {
+    if (stat.ok()) {
+      struct FsInfo fs_info;
+      GetFsStatsResponseProtoToFsInfo(fs_info, resp);
+      handler(stat, fs_info);
+    } else {
+      handler(stat, FsInfo());
+    }
+  });
+}
+
+void NameNodeOperations::GetListing(
+    const std::string & path,
+    std::function<void(const Status &, const std::vector<StatInfo> &, bool)> 
handler,
+    const std::string & start_after) {
+  using ::hadoop::hdfs::GetListingRequestProto;
+  using ::hadoop::hdfs::GetListingResponseProto;
+
+  LOG_TRACE(
+      kFileSystem,
+      << "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << 
path << ") called");
+
+  if (path.empty()) {
+    std::vector<StatInfo> empty;
+    handler(Status::InvalidArgument("GetListing: argument 'path' cannot be 
empty"), empty, false);
+    return;
+  }
+
+  GetListingRequestProto req;
+  req.set_src(path);
+  req.set_startafter(start_after.c_str());
+  req.set_needlocation(false);
+
+  auto resp = std::make_shared<GetListingResponseProto>();
+
+  namenode_.GetListing(&req, resp, [resp, handler, path](const Status &stat) {
+    std::vector<StatInfo> stat_infos;
+    if (stat.ok()) {
+      if(resp -> has_dirlist()){
+        for (::hadoop::hdfs::HdfsFileStatusProto const& fs : 
resp->dirlist().partiallisting()) {
+          StatInfo si;
+          si.path = fs.path();
+          si.full_path = path + fs.path();
+          if(si.full_path.back() != '/'){
+            si.full_path += "/";
+          }
+          HdfsFileStatusProtoToStatInfo(si, fs);
+          stat_infos.push_back(si);
+        }
+        handler(stat, stat_infos, resp->dirlist().remainingentries() > 0);
+      } else {
+        std::string errormsg = "No such file or directory: " + path;
+        handler(Status::PathNotFound(errormsg.c_str()), stat_infos, false);
+      }
+    } else {
+      handler(stat, stat_infos, false);
+    }
+  });
+}
+
+void NameNodeOperations::Mkdirs(const std::string & path, uint16_t 
permissions, bool createparent,
+  std::function<void(const Status &)> handler)
+{
+  using ::hadoop::hdfs::MkdirsRequestProto;
+  using ::hadoop::hdfs::MkdirsResponseProto;
+
+  LOG_TRACE(kFileSystem,
+      << "NameNodeOperations::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
+      ", permissions=" << permissions << ", createparent=" << createparent << 
") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("Mkdirs: argument 'path' cannot be 
empty"));
+    return;
+  }
+
+  MkdirsRequestProto req;
+  Status permStatus = FileSystemImpl::CheckValidPermissionMask(permissions);
+  if (!permStatus.ok()) {
+    handler(permStatus);
+    return;
+  }
+  req.set_src(path);
+  hadoop::hdfs::FsPermissionProto *perm = req.mutable_masked();
+  perm->set_perm(permissions);
+  req.set_createparent(createparent);
+
+  auto resp = std::make_shared<MkdirsResponseProto>();
+
+  namenode_.Mkdirs(&req, resp, [resp, handler, path](const Status &stat) {
+    if (stat.ok()) {
+      // Checking resp
+      if(resp -> has_result() && resp ->result() == 1) {
+        handler(stat);
+      } else {
+        //NameNode does not specify why there is no result, in my testing it 
was happening when the path is not found
+        std::string errormsg = "No such file or directory: " + path;
+        Status statNew = Status::PathNotFound(errormsg.c_str());
+        handler(statNew);
+      }
+    } else {
+      handler(stat);
+    }
+  });
+}
+
+void NameNodeOperations::Delete(const std::string & path, bool recursive, 
std::function<void(const Status &)> handler) {
+  using ::hadoop::hdfs::DeleteRequestProto;
+  using ::hadoop::hdfs::DeleteResponseProto;
+
+  LOG_TRACE(kFileSystem,
+      << "NameNodeOperations::Delete(" << FMT_THIS_ADDR << ", path=" << path 
<< ", recursive=" << recursive << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("Delete: argument 'path' cannot be 
empty"));
+    return;
+  }
+
+  DeleteRequestProto req;
+  req.set_src(path);
+  req.set_recursive(recursive);
+
+  auto resp = std::make_shared<DeleteResponseProto>();
+
+  namenode_.Delete(&req, resp, [resp, handler, path](const Status &stat) {
+    if (stat.ok()) {
+      // Checking resp
+      if(resp -> has_result() && resp ->result() == 1) {
+        handler(stat);
+      } else {
+        //NameNode does not specify why there is no result, in my testing it 
was happening when the path is not found
+        std::string errormsg = "No such file or directory: " + path;
+        Status statNew = Status::PathNotFound(errormsg.c_str());
+        handler(statNew);
+      }
+    } else {
+      handler(stat);
+    }
+  });
+}
+
+void NameNodeOperations::Rename(const std::string & oldPath, const std::string 
& newPath, std::function<void(const Status &)> handler) {
+  using ::hadoop::hdfs::RenameRequestProto;
+  using ::hadoop::hdfs::RenameResponseProto;
+
+  LOG_TRACE(kFileSystem,
+      << "NameNodeOperations::Rename(" << FMT_THIS_ADDR << ", oldPath=" << 
oldPath << ", newPath=" << newPath << ") called");
+
+  if (oldPath.empty()) {
+    handler(Status::InvalidArgument("Rename: argument 'oldPath' cannot be 
empty"));
+    return;
+  }
+
+  if (newPath.empty()) {
+    handler(Status::InvalidArgument("Rename: argument 'newPath' cannot be 
empty"));
+    return;
+  }
+
+  RenameRequestProto req;
+  req.set_src(oldPath);
+  req.set_dst(newPath);
+
+  auto resp = std::make_shared<RenameResponseProto>();
+
+  namenode_.Rename(&req, resp, [resp, handler](const Status &stat) {
+    if (stat.ok()) {
+      // Checking resp
+      if(resp -> has_result() && resp ->result() == 1) {
+        handler(stat);
+      } else {
+        //Since NameNode does not specify why the result is not success, we 
set the general error
+        std::string errormsg = "oldPath and parent directory of newPath must 
exist. newPath must not exist.";
+        Status statNew = Status::InvalidArgument(errormsg.c_str());
+        handler(statNew);
+      }
+    } else {
+      handler(stat);
+    }
+  });
+}
+
+void NameNodeOperations::SetPermission(const std::string & path,
+    uint16_t permissions, std::function<void(const Status &)> handler) {
+  using ::hadoop::hdfs::SetPermissionRequestProto;
+  using ::hadoop::hdfs::SetPermissionResponseProto;
+
+  LOG_TRACE(kFileSystem,
+      << "NameNodeOperations::SetPermission(" << FMT_THIS_ADDR << ", path=" << 
path << ", permissions=" << permissions << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be 
empty"));
+    return;
+  }
+  Status permStatus = FileSystemImpl::CheckValidPermissionMask(permissions);
+  if (!permStatus.ok()) {
+    handler(permStatus);
+    return;
+  }
+
+  SetPermissionRequestProto req;
+  req.set_src(path);
+
+  hadoop::hdfs::FsPermissionProto *perm = req.mutable_permission();
+  perm->set_perm(permissions);
+
+  auto resp = std::make_shared<SetPermissionResponseProto>();
+
+  namenode_.SetPermission(&req, resp,
+      [handler](const Status &stat) {
+        handler(stat);
+      });
+}
+
+void NameNodeOperations::SetOwner(const std::string & path,
+    const std::string & username, const std::string & groupname, 
std::function<void(const Status &)> handler) {
+  using ::hadoop::hdfs::SetOwnerRequestProto;
+  using ::hadoop::hdfs::SetOwnerResponseProto;
+
+  LOG_TRACE(kFileSystem,
+      << "NameNodeOperations::SetOwner(" << FMT_THIS_ADDR << ", path=" << path 
<< ", username=" << username << ", groupname=" << groupname << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("SetOwner: argument 'path' cannot be 
empty"));
+    return;
+  }
+
+  SetOwnerRequestProto req;
+  req.set_src(path);
+
+  if(!username.empty()) {
+    req.set_username(username);
+  }
+  if(!groupname.empty()) {
+    req.set_groupname(groupname);
+  }
+
+  auto resp = std::make_shared<SetOwnerResponseProto>();
+
+  namenode_.SetOwner(&req, resp,
+      [handler](const Status &stat) {
+        handler(stat);
+      });
+}
+
+void NameNodeOperations::CreateSnapshot(const std::string & path,
+    const std::string & name, std::function<void(const Status &)> handler) {
+  using ::hadoop::hdfs::CreateSnapshotRequestProto;
+  using ::hadoop::hdfs::CreateSnapshotResponseProto;
+
+  LOG_TRACE(kFileSystem,
+      << "NameNodeOperations::CreateSnapshot(" << FMT_THIS_ADDR << ", path=" 
<< path << ", name=" << name << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("CreateSnapshot: argument 'path' cannot be 
empty"));
+    return;
+  }
+
+  CreateSnapshotRequestProto req;
+  req.set_snapshotroot(path);
+  if (!name.empty()) {
+    req.set_snapshotname(name);
+  }
+
+  auto resp = std::make_shared<CreateSnapshotResponseProto>();
+
+  namenode_.CreateSnapshot(&req, resp,
+      [handler](const Status &stat) {
+        handler(stat);
+      });
+}
+
+void NameNodeOperations::DeleteSnapshot(const std::string & path,
+    const std::string & name, std::function<void(const Status &)> handler) {
+  using ::hadoop::hdfs::DeleteSnapshotRequestProto;
+  using ::hadoop::hdfs::DeleteSnapshotResponseProto;
+
+  LOG_TRACE(kFileSystem,
+      << "NameNodeOperations::DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" 
<< path << ", name=" << name << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("DeleteSnapshot: argument 'path' cannot be 
empty"));
+    return;
+  }
+  if (name.empty()) {
+    handler(Status::InvalidArgument("DeleteSnapshot: argument 'name' cannot be 
empty"));
+    return;
+  }
+
+  DeleteSnapshotRequestProto req;
+  req.set_snapshotroot(path);
+  req.set_snapshotname(name);
+
+  auto resp = std::make_shared<DeleteSnapshotResponseProto>();
+
+  namenode_.DeleteSnapshot(&req, resp,
+      [handler](const Status &stat) {
+        handler(stat);
+      });
+}
+
+void NameNodeOperations::RenameSnapshot(const std::string & path, const 
std::string & old_name,
+    const std::string & new_name, std::function<void(const Status &)> handler) 
{
+  using ::hadoop::hdfs::RenameSnapshotRequestProto;
+  using ::hadoop::hdfs::RenameSnapshotResponseProto;
+
+  LOG_TRACE(kFileSystem,
+      << "NameNodeOperations::RenameSnapshot(" << FMT_THIS_ADDR << ", path=" 
<< path <<
+      ", old_name=" << old_name << ", new_name=" << new_name << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("RenameSnapshot: argument 'path' cannot be 
empty"));
+    return;
+  }
+  if (old_name.empty()) {
+    handler(Status::InvalidArgument("RenameSnapshot: argument 'old_name' 
cannot be empty"));
+    return;
+  }
+  if (new_name.empty()) {
+    handler(Status::InvalidArgument("RenameSnapshot: argument 'new_name' 
cannot be empty"));
+    return;
+  }
+
+  RenameSnapshotRequestProto req;
+  req.set_snapshotroot(path);
+  req.set_snapshotoldname(old_name);
+  req.set_snapshotnewname(new_name);
+
+  auto resp = std::make_shared<RenameSnapshotResponseProto>();
+
+  namenode_.RenameSnapshot(&req, resp,
+      [handler](const Status &stat) {
+        handler(stat);
+      });
+}
+
+void NameNodeOperations::AllowSnapshot(const std::string & path, 
std::function<void(const Status &)> handler) {
+  using ::hadoop::hdfs::AllowSnapshotRequestProto;
+  using ::hadoop::hdfs::AllowSnapshotResponseProto;
+
+  LOG_TRACE(kFileSystem,
+      << "NameNodeOperations::AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << 
path << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("AllowSnapshot: argument 'path' cannot be 
empty"));
+    return;
+  }
+
+  AllowSnapshotRequestProto req;
+  req.set_snapshotroot(path);
+
+  auto resp = std::make_shared<AllowSnapshotResponseProto>();
+
+  namenode_.AllowSnapshot(&req, resp,
+      [handler](const Status &stat) {
+        handler(stat);
+      });
+}
+
+void NameNodeOperations::DisallowSnapshot(const std::string & path, 
std::function<void(const Status &)> handler) {
+  using ::hadoop::hdfs::DisallowSnapshotRequestProto;
+  using ::hadoop::hdfs::DisallowSnapshotResponseProto;
+
+  LOG_TRACE(kFileSystem,
+      << "NameNodeOperations::DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" 
<< path << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("DisallowSnapshot: argument 'path' cannot 
be empty"));
+    return;
+  }
+
+  DisallowSnapshotRequestProto req;
+  req.set_snapshotroot(path);
+
+  auto resp = std::make_shared<DisallowSnapshotResponseProto>();
+
+  namenode_.DisallowSnapshot(&req, resp,
+      [handler](const Status &stat) {
+        handler(stat);
+      });
+}
+
+void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) {
+  engine_->SetFsEventCallback(callback);
+}
+
+void NameNodeOperations::HdfsFileStatusProtoToStatInfo(
+    hdfs::StatInfo & stat_info,
+    const ::hadoop::hdfs::HdfsFileStatusProto & fs) {
+  stat_info.file_type = fs.filetype();
+  stat_info.length = fs.length();
+  stat_info.permissions = fs.permission().perm();
+  stat_info.owner = fs.owner();
+  stat_info.group = fs.group();
+  stat_info.modification_time = fs.modification_time();
+  stat_info.access_time = fs.access_time();
+  stat_info.symlink = fs.symlink();
+  stat_info.block_replication = fs.block_replication();
+  stat_info.blocksize = fs.blocksize();
+  stat_info.fileid = fs.fileid();
+  stat_info.children_num = fs.childrennum();
+}
+
+void NameNodeOperations::ContentSummaryProtoToContentSummary(
+    hdfs::ContentSummary & content_summary,
+    const ::hadoop::hdfs::ContentSummaryProto & csp) {
+  content_summary.length = csp.length();
+  content_summary.filecount = csp.filecount();
+  content_summary.directorycount = csp.directorycount();
+  content_summary.quota = csp.quota();
+  content_summary.spaceconsumed = csp.spaceconsumed();
+  content_summary.spacequota = csp.spacequota();
+}
+
+void NameNodeOperations::GetFsStatsResponseProtoToFsInfo(
+    hdfs::FsInfo & fs_info,
+    const std::shared_ptr<::hadoop::hdfs::GetFsStatsResponseProto> & fs) {
+  fs_info.capacity = fs->capacity();
+  fs_info.used = fs->used();
+  fs_info.remaining = fs->remaining();
+  fs_info.under_replicated = fs->under_replicated();
+  fs_info.corrupt_blocks = fs->corrupt_blocks();
+  fs_info.missing_blocks = fs->missing_blocks();
+  fs_info.missing_repl_one_blocks = fs->missing_repl_one_blocks();
+  if(fs->has_blocks_in_future()){
+    fs_info.blocks_in_future = fs->blocks_in_future();
+  }
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
new file mode 100644
index 0000000..f4caa18
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_LIB_FS_NAMENODEOPERATIONS_H_
+#define LIBHDFSPP_LIB_FS_NAMENODEOPERATIONS_H_
+
+#include "rpc/rpc_engine.h"
+#include "hdfspp/statinfo.h"
+#include "hdfspp/fsinfo.h"
+#include "hdfspp/content_summary.h"
+#include "common/namenode_info.h"
+#include "ClientNamenodeProtocol.pb.h"
+#include "ClientNamenodeProtocol.hrpc.inl"
+
+
+namespace hdfs {
+
+/**
+* NameNodeConnection: abstracts the details of communicating with a NameNode
+* and the implementation of the communications protocol.
+*
+* Will eventually handle retry and failover.
+*
+* Threading model: thread-safe; all operations can be called concurrently
+* Lifetime: owned by a FileSystemImpl
+*/
+
+class NameNodeOperations {
+public:
+  MEMCHECKED_CLASS(NameNodeOperations)
+  NameNodeOperations(::asio::io_service *io_service, const Options &options,
+            const std::string &client_name, const std::string &user_name,
+            const char *protocol_name, int protocol_version) :
+  io_service_(io_service),
+  engine_(std::make_shared<RpcEngine>(io_service, options, client_name, 
user_name, protocol_name, protocol_version)),
+  namenode_(engine_), options_(options) {}
+
+
+  void Connect(const std::string &cluster_name,
+               const std::vector<ResolvedNamenodeInfo> &servers,
+               std::function<void(const Status &)> &&handler);
+
+  bool CancelPendingConnect();
+
+  void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t 
length,
+    std::function<void(const Status &, std::shared_ptr<const struct 
FileInfo>)> handler);
+
+  void GetPreferredBlockSize(const std::string & path,
+    std::function<void(const Status &, const uint64_t)> handler);
+
+  void SetReplication(const std::string & path, int16_t replication,
+    std::function<void(const Status &)> handler);
+
+  void SetTimes(const std::string & path, uint64_t mtime, uint64_t atime,
+    std::function<void(const Status &)> handler);
+
+  void GetFileInfo(const std::string & path,
+    std::function<void(const Status &, const StatInfo &)> handler);
+
+  void GetContentSummary(const std::string & path,
+    std::function<void(const Status &, const ContentSummary &)> handler);
+
+  void GetFsStats(std::function<void(const Status &, const FsInfo &)> handler);
+
+  // start_after="" for initial call
+  void GetListing(const std::string & path,
+        std::function<void(const Status &, const std::vector<StatInfo> &, 
bool)> handler,
+        const std::string & start_after = "");
+
+  void Mkdirs(const std::string & path, uint16_t permissions, bool 
createparent,
+    std::function<void(const Status &)> handler);
+
+  void Delete(const std::string & path, bool recursive,
+      std::function<void(const Status &)> handler);
+
+  void Rename(const std::string & oldPath, const std::string & newPath,
+      std::function<void(const Status &)> handler);
+
+  void SetPermission(const std::string & path, uint16_t permissions,
+      std::function<void(const Status &)> handler);
+
+  void SetOwner(const std::string & path, const std::string & username,
+      const std::string & groupname, std::function<void(const Status &)> 
handler);
+
+  void CreateSnapshot(const std::string & path, const std::string & name,
+      std::function<void(const Status &)> handler);
+
+  void DeleteSnapshot(const std::string & path, const std::string & name,
+      std::function<void(const Status &)> handler);
+
+  void RenameSnapshot(const std::string & path, const std::string & old_name, 
const std::string & new_name,
+      std::function<void(const Status &)> handler);
+
+  void AllowSnapshot(const std::string & path,
+      std::function<void(const Status &)> handler);
+
+  void DisallowSnapshot(const std::string & path,
+      std::function<void(const Status &)> handler);
+
+  void SetFsEventCallback(fs_event_callback callback);
+
+private:
+  static void HdfsFileStatusProtoToStatInfo(hdfs::StatInfo & si, const 
::hadoop::hdfs::HdfsFileStatusProto & fs);
+  static void ContentSummaryProtoToContentSummary(hdfs::ContentSummary & 
content_summary, const ::hadoop::hdfs::ContentSummaryProto & csp);
+  static void 
DirectoryListingProtoToStatInfo(std::shared_ptr<std::vector<StatInfo>> 
stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl);
+  static void GetFsStatsResponseProtoToFsInfo(hdfs::FsInfo & fs_info, const 
std::shared_ptr<::hadoop::hdfs::GetFsStatsResponseProto> & fs);
+
+  ::asio::io_service * io_service_;
+
+  // This is the only permanent owner of the RpcEngine, however the RPC layer
+  // needs to reference count it prevent races during FileSystem destruction.
+  // In order to do this they hold weak_ptrs and promote them to shared_ptr
+  // when calling non-blocking RpcEngine methods e.g. get_client_id().
+  std::shared_ptr<RpcEngine> engine_;
+
+  // Automatically generated methods for RPC calls.  See protoc_gen_hrpc.cc
+  ClientNamenodeProtocol namenode_;
+  const Options options_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
new file mode 100644
index 0000000..2eff301
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set(PROTOBUF_IMPORT_DIRS ${PROTO_HDFS_DIR} ${PROTO_HADOOP_DIR})
+
+protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS
+  ${PROTO_HDFS_DIR}/datatransfer.proto
+  ${PROTO_HDFS_DIR}/ClientDatanodeProtocol.proto
+  ${PROTO_HDFS_DIR}/ClientNamenodeProtocol.proto
+  ${PROTO_HDFS_DIR}/acl.proto
+  ${PROTO_HDFS_DIR}/datatransfer.proto
+  ${PROTO_HDFS_DIR}/encryption.proto
+  ${PROTO_HDFS_DIR}/erasurecoding.proto
+  ${PROTO_HDFS_DIR}/hdfs.proto
+  ${PROTO_HDFS_DIR}/inotify.proto
+  ${PROTO_HDFS_DIR}/xattr.proto
+  ${PROTO_HDFS_DIR}/ReconfigurationProtocol.proto
+  ${PROTO_HADOOP_DIR}/IpcConnectionContext.proto
+  ${PROTO_HADOOP_DIR}/ProtobufRpcEngine.proto
+  ${PROTO_HADOOP_DIR}/RpcHeader.proto
+  ${PROTO_HADOOP_DIR}/Security.proto
+)
+
+add_executable(protoc-gen-hrpc protoc_gen_hrpc.cc)
+target_link_libraries(protoc-gen-hrpc ${PROTOBUF_PROTOC_LIBRARY} 
${PROTOBUF_LIBRARY})
+
+function(GEN_HRPC SRCS)
+  if(NOT ARGN)
+    message(SEND_ERROR "Error: GEN_HRPC() called without any proto files")
+    return()
+  endif()
+
+  if(DEFINED PROTOBUF_IMPORT_DIRS)
+    foreach(DIR ${PROTOBUF_IMPORT_DIRS})
+      get_filename_component(ABS_PATH ${DIR} ABSOLUTE)
+      list(FIND _protobuf_include_path ${ABS_PATH} _contains_already)
+      if(${_contains_already} EQUAL -1)
+          list(APPEND _protobuf_include_path -I ${ABS_PATH})
+      endif()
+    endforeach()
+  endif()
+
+  set(${SRCS})
+
+  foreach(FIL ${ARGN})
+    get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
+    get_filename_component(FIL_WE ${FIL} NAME_WE)
+
+    list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.hrpc.inl")
+
+    add_custom_command(
+      OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.hrpc.inl"
+      COMMAND ${PROTOBUF_PROTOC_EXECUTABLE}
+      ARGS 
--plugin=protoc-gen-hrpc=${CMAKE_CURRENT_BINARY_DIR}/protoc-gen-hrpc 
--hrpc_out=${CMAKE_CURRENT_BINARY_DIR} ${_protobuf_include_path} ${ABS_FIL}
+      DEPENDS ${ABS_FIL} ${PROTOBUF_PROTOC_EXECUTABLE} protoc-gen-hrpc
+      COMMENT "Running HRPC protocol buffer compiler on ${FIL}"
+      VERBATIM )
+  endforeach()
+
+  set_source_files_properties(${${SRCS}} PROPERTIES GENERATED TRUE)
+  set(${SRCS} ${${SRCS}} PARENT_SCOPE)
+endfunction()
+
+gen_hrpc(HRPC_SRCS
+  ${PROTO_HDFS_DIR}/ClientNamenodeProtocol.proto
+)
+
+add_library(proto_obj OBJECT ${PROTO_SRCS} ${PROTO_HDRS} ${HRPC_SRCS})
+if(HADOOP_BUILD)
+  add_dependencies(proto_obj copy_hadoop_files)
+endif(HADOOP_BUILD)
+add_library(proto $<TARGET_OBJECTS:proto_obj>)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc
new file mode 100644
index 0000000..e7355c0
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "protobuf/cpp_helpers.h"
+
+#include <google/protobuf/compiler/code_generator.h>
+#include <google/protobuf/compiler/plugin.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/io/printer.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#include <google/protobuf/stubs/common.h>
+
+#include <memory>
+
+using ::google::protobuf::FileDescriptor;
+using ::google::protobuf::MethodDescriptor;
+using ::google::protobuf::ServiceDescriptor;
+using ::google::protobuf::compiler::CodeGenerator;
+using ::google::protobuf::compiler::GeneratorContext;
+using ::google::protobuf::io::Printer;
+using ::google::protobuf::io::ZeroCopyOutputStream;
+
+class StubGenerator : public CodeGenerator {
+public:
+  virtual bool Generate(const FileDescriptor *file, const std::string &,
+                        GeneratorContext *ctx,
+                        std::string *error) const override;
+
+private:
+  void EmitService(const ServiceDescriptor *service, Printer *out) const;
+  void EmitMethod(const MethodDescriptor *method, Printer *out) const;
+};
+
+bool StubGenerator::Generate(const FileDescriptor *file, const std::string &,
+                             GeneratorContext *ctx, std::string *) const {
+  namespace pb = ::google::protobuf;
+  std::unique_ptr<ZeroCopyOutputStream> os(
+      ctx->Open(StripProto(file->name()) + ".hrpc.inl"));
+  Printer out(os.get(), '$');
+  for (int i = 0; i < file->service_count(); ++i) {
+    const ServiceDescriptor *service = file->service(i);
+    EmitService(service, &out);
+  }
+  return true;
+}
+
+void StubGenerator::EmitService(const ServiceDescriptor *service,
+                                Printer *out) const {
+  out->Print("\n// GENERATED AUTOMATICALLY. DO NOT MODIFY.\n"
+             "class $service$ {\n"
+             "private:\n"
+             "  std::shared_ptr<::hdfs::RpcEngine> engine_;\n"
+             "public:\n"
+             "  typedef std::function<void(const ::hdfs::Status &)> 
Callback;\n"
+             "  typedef ::google::protobuf::MessageLite Message;\n"
+             "  inline $service$(std::shared_ptr<::hdfs::RpcEngine> engine)\n"
+             "    : engine_(engine) {}\n",
+             "service", service->name());
+  for (int i = 0; i < service->method_count(); ++i) {
+    const MethodDescriptor *method = service->method(i);
+    EmitMethod(method, out);
+  }
+  out->Print("};\n");
+}
+
+void StubGenerator::EmitMethod(const MethodDescriptor *method,
+                               Printer *out) const {
+  out->Print(
+      "\n  inline void $camel_method$(const Message *req, "
+      "const std::shared_ptr<Message> &resp, "
+      "const Callback &handler) {\n"
+      "    engine_->AsyncRpc(\"$method$\", req, resp, handler);\n"
+      "  }\n",
+      "camel_method", ToCamelCase(method->name()), "method", method->name());
+}
+
+int main(int argc, char *argv[]) {
+  StubGenerator generator;
+  return google::protobuf::compiler::PluginMain(argc, argv, &generator);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
new file mode 100644
index 0000000..2bcfd92
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+add_library(reader_obj OBJECT block_reader.cc datatransfer.cc readergroup.cc)
+add_dependencies(reader_obj proto)
+add_library(reader $<TARGET_OBJECTS:reader_obj>)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
new file mode 100644
index 0000000..ca7715d
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
@@ -0,0 +1,571 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "reader/block_reader.h"
+#include "reader/datatransfer.h"
+#include "common/continuation/continuation.h"
+#include "common/continuation/asio.h"
+#include "common/logging.h"
+#include "common/util.h"
+
+#include <future>
+
+namespace hdfs {
+
+#define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << 
(void*)parent_
+#define FMT_CONT_AND_READER_ADDR "this=" << (void*)this << ", reader=" << 
(void*)reader_
+#define FMT_THIS_ADDR "this=" << (void*)this
+
+
+// Stuff an OpReadBlockProto message with required fields.
+hadoop::hdfs::OpReadBlockProto ReadBlockProto(const std::string &client_name,
+    bool verify_checksum, const hadoop::common::TokenProto *token,
+    const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t 
offset)
+{
+  using namespace hadoop::hdfs;
+  using namespace hadoop::common;
+  BaseHeaderProto *base_h = new BaseHeaderProto();
+  base_h->set_allocated_block(new ExtendedBlockProto(*block));
+  if (token) {
+    base_h->set_allocated_token(new TokenProto(*token));
+  }
+  ClientOperationHeaderProto *h = new ClientOperationHeaderProto();
+  h->set_clientname(client_name);
+  h->set_allocated_baseheader(base_h);
+
+  OpReadBlockProto p;
+  p.set_allocated_header(h);
+  p.set_offset(offset);
+  p.set_len(length);
+  p.set_sendchecksums(verify_checksum);
+  // TODO: p.set_allocated_cachingstrategy();
+  return p;
+}
+
+//
+//  Notes about the BlockReader and associated object lifecycles  (9/29/16)
+//  -We have a several stages in the read pipeline.  Each stage represents a 
logical
+//   step in the HDFS block transfer logic.  They are implemented as 
continuations
+//   for now, and in some cases the stage may have a nested continuation as 
well.
+//   It's important to make sure that continuations, nested or otherwise, 
cannot
+//   outlive the objects they depend on.
+//
+//  -The BlockReader holds a shared_ptr to the DataNodeConnection that's used 
in each
+//   pipeline stage.  The connection object must never be destroyed while 
operations are
+//   pending on the ASIO side (see HDFS-10931).  In order to prevent a state 
where the
+//   BlockReader or one of the corresponding pipelines outlives the connection 
each
+//   pipeline stage must explicitly hold a shared pointer copied from 
BlockReaderImpl::dn_.
+//
+
+
+static int8_t unsecured_request_block_header[3] = {0, kDataTransferVersion, 
Operation::kReadBlock};
+
+void BlockReaderImpl::AsyncRequestBlock(const std::string &client_name,
+    const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+    uint64_t offset, const std::function<void(Status)> &handler)
+{
+  LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncRequestBlock("
+                          << FMT_THIS_ADDR << ", ..., length="
+                          << length << ", offset=" << offset << ", ...) 
called");
+
+  // The total number of bytes that we need to transfer from the DN is
+  // the amount that the user wants (bytesToRead), plus the padding at
+  // the beginning in order to chunk-align. Note that the DN may elect
+  // to send more than this amount if the read starts/ends mid-chunk.
+  bytes_to_read_ = length;
+
+  struct State {
+    std::string header;
+    hadoop::hdfs::OpReadBlockProto request;
+    hadoop::hdfs::BlockOpResponseProto response;
+  };
+
+  auto m = continuation::Pipeline<State>::Create(cancel_state_);
+  State *s = &m->state();
+
+  s->request = ReadBlockProto(client_name, options_.verify_checksum,
+                              dn_->token_.get(), block, length, offset);
+
+  s->header = std::string((const char*)unsecured_request_block_header, 3);
+
+  bool serialize_success = true;
+  s->header += SerializeDelimitedProtobufMessage(&s->request, 
&serialize_success);
+
+  if(!serialize_success) {
+    handler(Status::Error("Unable to serialize protobuf message"));
+    return;
+  }
+
+  auto read_pb_message =
+      new continuation::ReadDelimitedPBMessageContinuation<AsyncStream, 
16384>(dn_, &s->response);
+
+  m->Push(asio_continuation::Write(dn_, 
asio::buffer(s->header))).Push(read_pb_message);
+
+  m->Run([this, handler, offset](const Status &status, const State &s) {    
Status stat = status;
+    if (stat.ok()) {
+      const auto &resp = s.response;
+
+      if(this->event_handlers_) {
+        event_response event_resp = 
this->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+        if (stat.ok() && event_resp.response_type() == 
event_response::kTest_Error) {
+          stat = Status::Error("Test error");
+        }
+#endif
+      }
+
+      if (stat.ok() && resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
+        if (resp.has_readopchecksuminfo()) {
+          const auto &checksum_info = resp.readopchecksuminfo();
+          chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
+        }
+        state_ = kReadPacketHeader;
+      } else {
+        stat = Status::Error(s.response.message().c_str());
+      }
+    }
+    handler(stat);
+  });
+}
+
+Status BlockReaderImpl::RequestBlock(const std::string &client_name,
+                                     const hadoop::hdfs::ExtendedBlockProto 
*block,
+                                     uint64_t length, uint64_t offset)
+{
+  LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlock("
+                          << FMT_THIS_ADDR <<"..., length="
+                          << length << ", offset=" << offset << ") called");
+
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat->get_future());
+  AsyncRequestBlock(client_name, block, length, offset,
+                [stat](const Status &status) { stat->set_value(status); });
+  return future.get();
+}
+
+struct BlockReaderImpl::ReadPacketHeader : continuation::Continuation
+{
+  ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent), 
shared_conn_(parent->dn_) {}
+
+  virtual void Run(const Next &next) override {
+    LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacketHeader::Run("
+                            << FMT_CONT_AND_PARENT_ADDR << ") called");
+
+    parent_->packet_data_read_bytes_ = 0;
+    parent_->packet_len_ = 0;
+    auto handler = [next, this](const asio::error_code &ec, size_t) {
+      Status status;
+      if (ec) {
+        status = Status(ec.value(), ec.message().c_str());
+      } else {
+        parent_->packet_len_ = packet_length();
+        parent_->header_.Clear();
+        bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart],
+                                                 header_length());
+        assert(v && "Failed to parse the header");
+        (void)v; //avoids unused variable warning
+        parent_->state_ = kReadChecksum;
+      }
+      if(parent_->event_handlers_) {
+        event_response event_resp = 
parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+        if (status.ok() && event_resp.response_type() == 
event_response::kTest_Error) {
+          status = Status::Error("Test error");
+        }
+#endif
+      }
+      next(status);
+    };
+
+    asio::async_read(*parent_->dn_, asio::buffer(buf_),
+                     std::bind(&ReadPacketHeader::CompletionHandler, this,
+                               std::placeholders::_1, std::placeholders::_2), 
handler);
+  }
+
+private:
+  static const size_t kMaxHeaderSize = 512;
+  static const size_t kPayloadLenOffset = 0;
+  static const size_t kPayloadLenSize = sizeof(int32_t);
+  static const size_t kHeaderLenOffset = 4;
+  static const size_t kHeaderLenSize = sizeof(int16_t);
+  static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize;
+
+  BlockReaderImpl *parent_;
+  std::array<char, kMaxHeaderSize> buf_;
+
+  size_t packet_length() const {
+    return ntohl(*reinterpret_cast<const unsigned 
*>(&buf_[kPayloadLenOffset]));
+  }
+
+  size_t header_length() const {
+    return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset]));
+  }
+
+  size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
+    if (ec) {
+      return 0;
+    } else if (transferred < kHeaderStart) {
+      return kHeaderStart - transferred;
+    } else {
+      return kHeaderStart + header_length() - transferred;
+    }
+  }
+
+  // Keep the DN connection alive
+  std::shared_ptr<DataNodeConnection> shared_conn_;
+};
+
+struct BlockReaderImpl::ReadChecksum : continuation::Continuation
+{
+  ReadChecksum(BlockReaderImpl *parent) : parent_(parent), 
shared_conn_(parent->dn_) {}
+
+  virtual void Run(const Next &next) override {
+    LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadChecksum::Run("
+                            << FMT_CONT_AND_PARENT_ADDR << ") called");
+
+    auto parent = parent_;
+    if (parent->state_ != kReadChecksum) {
+      next(Status::OK());
+      return;
+    }
+
+    std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_;
+
+    auto handler = [parent, next, this, keep_conn_alive_](const 
asio::error_code &ec, size_t)
+    {
+      Status status;
+      if (ec) {
+        status = Status(ec.value(), ec.message().c_str());
+      } else {
+        parent->state_ = parent->chunk_padding_bytes_ ? kReadPadding : 
kReadData;
+      }
+      if(parent->event_handlers_) {
+        event_response event_resp = 
parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+        if (status.ok() && event_resp.response_type() == 
event_response::kTest_Error) {
+          status = Status::Error("Test error");
+        }
+#endif
+      }
+      next(status);
+    };
+
+    parent->checksum_.resize(parent->packet_len_ - sizeof(int) - 
parent->header_.datalen());
+
+    asio::async_read(*parent->dn_, asio::buffer(parent->checksum_), handler);
+  }
+
+private:
+  BlockReaderImpl *parent_;
+
+  // Keep the DataNodeConnection alive
+  std::shared_ptr<DataNodeConnection> shared_conn_;
+};
+
+struct BlockReaderImpl::ReadData : continuation::Continuation
+{
+  ReadData(BlockReaderImpl *parent, std::shared_ptr<size_t> bytes_transferred,
+        const asio::mutable_buffers_1 &buf) : parent_(parent),
+        bytes_transferred_(bytes_transferred), buf_(buf), 
shared_conn_(parent->dn_)
+  {
+    buf_.begin();
+  }
+
+  ~ReadData() {
+    buf_.end();
+  }
+
+  virtual void Run(const Next &next) override {
+    LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run("
+                            << FMT_CONT_AND_PARENT_ADDR << ") called");
+    auto handler =
+        [next, this](const asio::error_code &ec, size_t transferred) {
+          Status status;
+          if (ec) {
+            status = Status(ec.value(), ec.message().c_str());
+          }
+
+          *bytes_transferred_ += transferred;
+          parent_->bytes_to_read_ -= transferred;
+          parent_->packet_data_read_bytes_ += transferred;
+
+          if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
+            parent_->state_ = kReadPacketHeader;
+          }
+
+          if(parent_->event_handlers_) {
+            event_response event_resp = 
parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+            if (status.ok() && event_resp.response_type() == 
event_response::kTest_Error) {
+                status = Status::Error("Test error");
+            }
+#endif
+          }
+          next(status);
+        };
+
+    auto data_len = parent_->header_.datalen() - 
parent_->packet_data_read_bytes_;
+
+    asio::async_read(*parent_->dn_, buf_, asio::transfer_exactly(data_len), 
handler);
+  }
+
+private:
+  BlockReaderImpl *parent_;
+  std::shared_ptr<size_t> bytes_transferred_;
+  const asio::mutable_buffers_1 buf_;
+
+  // Keep DNConnection alive.
+  std::shared_ptr<DataNodeConnection> shared_conn_;
+};
+
+struct BlockReaderImpl::ReadPadding : continuation::Continuation
+{
+  ReadPadding(BlockReaderImpl *parent) : parent_(parent),
+        padding_(parent->chunk_padding_bytes_),
+        bytes_transferred_(std::make_shared<size_t>(0)),
+        read_data_(new ReadData(parent, bytes_transferred_, 
asio::buffer(padding_))),
+        shared_conn_(parent->dn_) {}
+
+  virtual void Run(const Next &next) override {
+    LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPadding::Run("
+                            << FMT_CONT_AND_PARENT_ADDR << ") called");
+
+    if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) {
+      next(Status::OK());
+      return;
+    }
+
+    std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_;
+
+    auto h = [next, this, keep_conn_alive_](const Status &stat) {
+      Status status = stat;
+      if (status.ok()) {
+        assert(reinterpret_cast<const int &>(*bytes_transferred_) == 
parent_->chunk_padding_bytes_);
+        parent_->chunk_padding_bytes_ = 0;
+        parent_->state_ = kReadData;
+      }
+      if(parent_->event_handlers_) {
+        event_response event_resp = 
parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+        if (status.ok() && event_resp.response_type() == 
event_response::kTest_Error) {
+          status = Status::Error("Test error");
+        }
+#endif
+      }
+      next(status);
+    };
+    read_data_->Run(h);
+  }
+
+private:
+  BlockReaderImpl *parent_;
+  std::vector<char> padding_;
+  std::shared_ptr<size_t> bytes_transferred_;
+  std::shared_ptr<continuation::Continuation> read_data_;
+  ReadPadding(const ReadPadding &) = delete;
+  ReadPadding &operator=(const ReadPadding &) = delete;
+
+  // Keep DNConnection alive.
+  std::shared_ptr<DataNodeConnection> shared_conn_;
+};
+
+
+struct BlockReaderImpl::AckRead : continuation::Continuation
+{
+  AckRead(BlockReaderImpl *parent) : parent_(parent), 
shared_conn_(parent->dn_) {}
+
+  virtual void Run(const Next &next) override {
+    LOG_TRACE(kBlockReader, << "BlockReaderImpl::AckRead::Run(" << 
FMT_CONT_AND_PARENT_ADDR << ") called");
+
+    if (parent_->bytes_to_read_ > 0) {
+      next(Status::OK());
+      return;
+    }
+
+    auto m = 
continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(parent_->cancel_state_);
+
+    m->state().set_status(parent_->options_.verify_checksum
+                              ? hadoop::hdfs::Status::CHECKSUM_OK
+                              : hadoop::hdfs::Status::SUCCESS);
+
+    m->Push(continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state()));
+
+    std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_;
+
+    m->Run([this, next, keep_conn_alive_](const Status &stat, const 
hadoop::hdfs::ClientReadStatusProto &)
+    {
+      Status status = stat;
+      if (status.ok()) {
+        parent_->state_ = BlockReaderImpl::kFinished;
+      }
+      if(parent_->event_handlers_) {
+        event_response event_resp = 
parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+        if (status.ok() && event_resp.response_type() == 
event_response::kTest_Error) {
+          status = Status::Error("Test error");
+        }
+#endif
+      }
+      next(status);
+    });
+  }
+
+private:
+  BlockReaderImpl *parent_;
+
+  // Keep DNConnection alive.
+  std::shared_ptr<DataNodeConnection> shared_conn_;
+};
+
+void BlockReaderImpl::AsyncReadPacket(const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t bytes_transferred)> 
&handler)
+{
+  assert(state_ != kOpen && "Not connected");
+
+  LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadPacket called");
+
+  struct State {
+    std::shared_ptr<size_t> bytes_transferred;
+  };
+
+  auto m = continuation::Pipeline<State>::Create(cancel_state_);
+  m->state().bytes_transferred = std::make_shared<size_t>(0);
+
+  // Note: some of these continuations have nested pipelines.
+  m->Push(new ReadPacketHeader(this))
+      .Push(new ReadChecksum(this))
+      .Push(new ReadPadding(this))
+      .Push(new ReadData(
+          this, m->state().bytes_transferred, buffers))
+      .Push(new AckRead(this));
+
+  auto self = this->shared_from_this();
+  m->Run([self, handler](const Status &status, const State &state) {
+    handler(status, *state.bytes_transferred);
+  });
+}
+
+
+size_t BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, Status 
*status)
+{
+  LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacket called");
+
+  size_t transferred = 0;
+  auto done = std::make_shared<std::promise<void>>();
+  auto future = done->get_future();
+  AsyncReadPacket(buffers,
+                  [status, &transferred, done](const Status &stat, size_t t) {
+                    *status = stat;
+                    transferred = t;
+                    done->set_value();
+                  });
+  future.wait();
+  return transferred;
+}
+
+
+struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation
+{
+  RequestBlockContinuation(BlockReader *reader, const std::string &client_name,
+        const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, 
uint64_t offset)
+      : reader_(reader), client_name_(client_name), length_(length), 
offset_(offset)
+  {
+    block_.CheckTypeAndMergeFrom(*block);
+  }
+
+  virtual void Run(const Next &next) override {
+    LOG_TRACE(kBlockReader, << 
"BlockReaderImpl::RequestBlockContinuation::Run("
+                            << FMT_CONT_AND_READER_ADDR << ") called");
+
+    reader_->AsyncRequestBlock(client_name_, &block_, length_, offset_, next);
+  }
+
+private:
+  BlockReader *reader_;
+  const std::string client_name_;
+  hadoop::hdfs::ExtendedBlockProto block_;
+  uint64_t length_;
+  uint64_t offset_;
+};
+
+struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation
+{
+  ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer, size_t 
*transferred)
+      : reader_(reader), buffer_(buffer), 
buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {}
+
+  virtual void Run(const Next &next) override {
+    LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadBlockContinuation::Run("
+                            << FMT_CONT_AND_READER_ADDR << ") called");
+    *transferred_ = 0;
+    next_ = next;
+    OnReadData(Status::OK(), 0);
+  }
+
+private:
+  BlockReader *reader_;
+  const MutableBuffers buffer_;
+  const size_t buffer_size_;
+  size_t *transferred_;
+  std::function<void(const Status &)> next_;
+
+  void OnReadData(const Status &status, size_t transferred) {
+    using std::placeholders::_1;
+    using std::placeholders::_2;
+    *transferred_ += transferred;
+    if (!status.ok()) {
+      next_(status);
+    } else if (*transferred_ >= buffer_size_) {
+      next_(status);
+    } else {
+      reader_->AsyncReadPacket(
+          asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
+          std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
+    }
+  }
+};
+
+void BlockReaderImpl::AsyncReadBlock(
+    const std::string & client_name,
+    const hadoop::hdfs::LocatedBlockProto &block,
+    size_t offset,
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t)> handler)
+{
+  LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadBlock("
+                          << FMT_THIS_ADDR << ") called");
+
+  auto m = continuation::Pipeline<size_t>::Create(cancel_state_);
+  size_t * bytesTransferred = &m->state();
+
+  size_t size = asio::buffer_size(buffers);
+
+  m->Push(new RequestBlockContinuation(this, client_name, &block.b(), size, 
offset))
+    .Push(new ReadBlockContinuation(this, buffers, bytesTransferred));
+
+  m->Run([handler] (const Status &status, const size_t totalBytesTransferred) {
+    handler(status, totalBytesTransferred);
+  });
+}
+
+void BlockReaderImpl::CancelOperation() {
+  LOG_TRACE(kBlockReader, << "BlockReaderImpl::CancelOperation("
+                          << FMT_THIS_ADDR << ") called");
+  /* just forward cancel to DNConnection */
+  dn_->Cancel();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
new file mode 100644
index 0000000..b5cbdf5
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCK_READER_H_
+#define BLOCK_READER_H_
+
+#include "hdfspp/status.h"
+#include "common/async_stream.h"
+#include "common/cancel_tracker.h"
+#include "common/new_delete.h"
+#include "datatransfer.pb.h"
+#include "connection/datanodeconnection.h"
+
+#include <memory>
+
+namespace hdfs {
+
+struct CacheStrategy {
+  bool drop_behind_specified;
+  bool drop_behind;
+  bool read_ahead_specified;
+  unsigned long long read_ahead;
+  CacheStrategy()
+      : drop_behind_specified(false), drop_behind(false),
+        read_ahead_specified(false), read_ahead(false) {}
+};
+
+enum DropBehindStrategy {
+  kUnspecified = 0,
+  kEnableDropBehind = 1,
+  kDisableDropBehind = 2,
+};
+
+enum EncryptionScheme {
+  kNone = 0,
+  kAESCTRNoPadding = 1,
+};
+
+struct BlockReaderOptions {
+  bool verify_checksum;
+  CacheStrategy cache_strategy;
+  EncryptionScheme encryption_scheme;
+
+  BlockReaderOptions()
+      : verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {}
+};
+
+/**
+ * Handles the operational state of request and reading a block (or portion of
+ * a block) from a DataNode.
+ *
+ * Threading model: not thread-safe.
+ * Lifecycle: should be created, used for a single read, then freed.
+ */
+class BlockReader {
+public:
+  MEMCHECKED_CLASS(BlockReader)
+  virtual void AsyncReadBlock(
+    const std::string & client_name,
+    const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t)> handler) = 0;
+
+  virtual void AsyncReadPacket(
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t bytes_transferred)> 
&handler) = 0;
+
+  virtual void AsyncRequestBlock(
+    const std::string &client_name,
+    const hadoop::hdfs::ExtendedBlockProto *block,
+    uint64_t length,
+    uint64_t offset,
+    const std::function<void(Status)> &handler) = 0;
+
+  virtual void CancelOperation() = 0;
+};
+
+class BlockReaderImpl
+    : public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> 
{
+public:
+  explicit BlockReaderImpl(const BlockReaderOptions &options, 
std::shared_ptr<DataNodeConnection> dn,
+                           CancelHandle cancel_state, 
std::shared_ptr<LibhdfsEvents> event_handlers=nullptr)
+      : dn_(dn), state_(kOpen), options_(options),
+        chunk_padding_bytes_(0), cancel_state_(cancel_state), 
event_handlers_(event_handlers.get()) {}
+
+  virtual void AsyncReadPacket(
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t bytes_transferred)> 
&handler) override;
+
+  virtual void AsyncRequestBlock(
+    const std::string &client_name,
+    const hadoop::hdfs::ExtendedBlockProto *block,
+    uint64_t length,
+    uint64_t offset,
+    const std::function<void(Status)> &handler) override;
+
+  virtual void AsyncReadBlock(
+    const std::string & client_name,
+    const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t)> handler) override;
+
+  virtual void CancelOperation() override;
+
+  size_t ReadPacket(const MutableBuffers &buffers, Status *status);
+
+  Status RequestBlock(
+    const std::string &client_name,
+    const hadoop::hdfs::ExtendedBlockProto *block,
+    uint64_t length,
+    uint64_t offset);
+
+private:
+  struct RequestBlockContinuation;
+  struct ReadBlockContinuation;
+
+  struct ReadPacketHeader;
+  struct ReadChecksum;
+  struct ReadPadding;
+  struct ReadData;
+  struct AckRead;
+  enum State {
+    kOpen,
+    kReadPacketHeader,
+    kReadChecksum,
+    kReadPadding,
+    kReadData,
+    kFinished,
+  };
+
+  std::shared_ptr<DataNodeConnection> dn_;
+  hadoop::hdfs::PacketHeaderProto header_;
+  State state_;
+  BlockReaderOptions options_;
+  size_t packet_len_;
+  int packet_data_read_bytes_;
+  int chunk_padding_bytes_;
+  long long bytes_to_read_;
+  std::vector<char> checksum_;
+  CancelHandle cancel_state_;
+  LibhdfsEvents* event_handlers_;
+};
+}
+
+#endif


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to