http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.h
new file mode 100644
index 0000000..1d596ad
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.h
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef LIBHDFSPP_BADDATANODETRACKER_H
+#define LIBHDFSPP_BADDATANODETRACKER_H
+
+#include <mutex>
+#include <chrono>
+#include <map>
+#include <string>
+#include <set>
+
+#include "hdfspp/options.h"
+#include "hdfspp/hdfspp.h"
+
+namespace hdfs {
+
+/**
+ * ExclusionSet is a simple override that can be filled with known
+ * bad node UUIDs and passed to AsyncPreadSome.
+ **/
+class ExclusionSet : public NodeExclusionRule {
+ public:
+  ExclusionSet(const std::set<std::string>& excluded);
+  virtual ~ExclusionSet();
+  virtual bool IsBadNode(const std::string& node_uuid);
+
+ private:
+  std::set<std::string> excluded_;
+};
+
+/**
+ * BadDataNodeTracker keeps a timestamped list of datanodes that have
+ * failed during past operations.  Entries present in this list will
+ * not be used for new requests.  Entries will be evicted from the list
+ * after a period of time has elapsed; the default is 10 minutes.
+ */
+class BadDataNodeTracker : public NodeExclusionRule {
+ public:
+  BadDataNodeTracker(const Options& options = Options());
+  virtual ~BadDataNodeTracker();
+  /* add a bad DN to the list */
+  void AddBadNode(const std::string& dn);
+  /* check if a node should be excluded */
+  virtual bool IsBadNode(const std::string& dn);
+  /* only for tests, shift clock by t milliseconds*/
+  void TEST_set_clock_shift(int t);
+
+ private:
+  typedef std::chrono::steady_clock Clock;
+  typedef std::chrono::time_point<Clock> TimePoint;
+  bool TimeoutExpired(const TimePoint& t);
+  /* after timeout_duration_ elapses remove DN */
+  const unsigned int timeout_duration_; /* milliseconds */
+  std::map<std::string, TimePoint> datanodes_;
+  std::mutex datanodes_update_lock_;
+  int test_clock_shift_;
+};
+}
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
new file mode 100644
index 0000000..ba702b0
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filehandle.h"
+#include "common/continuation/continuation.h"
+#include "common/logging.h"
+#include "connection/datanodeconnection.h"
+#include "reader/block_reader.h"
+#include "hdfspp/events.h"
+
+#include <future>
+#include <tuple>
+
+#define FMT_THIS_ADDR "this=" << (void*)this
+
+namespace hdfs {
+
+using ::hadoop::hdfs::LocatedBlocksProto;
+
+FileHandle::~FileHandle() {}
+
+FileHandleImpl::FileHandleImpl(const std::string & cluster_name,
+                               const std::string & path,
+                               ::asio::io_service *io_service, const 
std::string &client_name,
+                                 const std::shared_ptr<const struct FileInfo> 
file_info,
+                                 std::shared_ptr<BadDataNodeTracker> 
bad_data_nodes,
+                                 std::shared_ptr<LibhdfsEvents> event_handlers)
+    : cluster_name_(cluster_name), path_(path), io_service_(io_service), 
client_name_(client_name), file_info_(file_info),
+      bad_node_tracker_(bad_data_nodes), offset_(0), 
cancel_state_(CancelTracker::New()), event_handlers_(event_handlers), 
bytes_read_(0) {
+  LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl("
+                         << FMT_THIS_ADDR << ", ...) called");
+
+}
+
+void FileHandleImpl::PositionRead(
+    void *buf, size_t buf_size, uint64_t offset,
+    const std::function<void(const Status &, size_t)> &handler) {
+  LOG_DEBUG(kFileHandle, << "FileHandleImpl::PositionRead("
+                         << FMT_THIS_ADDR << ", buf=" << buf
+                         << ", buf_size=" << std::to_string(buf_size) << ") 
called");
+
+  /* prevent usage after cancelation */
+  if(cancel_state_->is_canceled()) {
+    handler(Status::Canceled(), 0);
+    return;
+  }
+
+  auto callback = [this, handler](const Status &status,
+                                  const std::string &contacted_datanode,
+                                  size_t bytes_read) {
+    /* determine if DN gets marked bad */
+    if (ShouldExclude(status)) {
+      bad_node_tracker_->AddBadNode(contacted_datanode);
+    }
+
+    bytes_read_ += bytes_read;
+    handler(status, bytes_read);
+  };
+
+  AsyncPreadSome(offset, asio::buffer(buf, buf_size), bad_node_tracker_, 
callback);
+}
+
+Status FileHandleImpl::PositionRead(void *buf, size_t buf_size, off_t offset, 
size_t *bytes_read) {
+  LOG_DEBUG(kFileHandle, << "FileHandleImpl::[sync]PositionRead("
+                         << FMT_THIS_ADDR << ", buf=" << buf
+                         << ", buf_size=" << std::to_string(buf_size)
+                         << ", offset=" << offset << ") called");
+
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, 
size_t>>>();
+  std::future<std::tuple<Status, size_t>> future(callstate->get_future());
+
+  /* wrap async call with promise/future to make it blocking */
+  auto callback = [callstate](const Status &s, size_t bytes) {
+    callstate->set_value(std::make_tuple(s,bytes));
+  };
+
+  PositionRead(buf, buf_size, offset, callback);
+
+  /* wait for async to finish */
+  auto returnstate = future.get();
+  auto stat = std::get<0>(returnstate);
+
+  if (!stat.ok()) {
+    return stat;
+  }
+
+  *bytes_read = std::get<1>(returnstate);
+  return stat;
+}
+
+Status FileHandleImpl::Read(void *buf, size_t buf_size, size_t *bytes_read) {
+  LOG_DEBUG(kFileHandle, << "FileHandleImpl::Read("
+                         << FMT_THIS_ADDR << ", buf=" << buf
+                         << ", buf_size=" << std::to_string(buf_size) << ") 
called");
+
+  Status stat = PositionRead(buf, buf_size, offset_, bytes_read);
+  if(!stat.ok()) {
+    return stat;
+  }
+
+  offset_ += *bytes_read;
+  return Status::OK();
+}
+
+Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) {
+  LOG_DEBUG(kFileHandle, << "FileHandleImpl::Seek("
+                         << ", offset=" << *offset << ", ...) called");
+
+  if(cancel_state_->is_canceled()) {
+    return Status::Canceled();
+  }
+
+  off_t new_offset = -1;
+
+  switch (whence) {
+    case std::ios_base::beg:
+      new_offset = *offset;
+      break;
+    case std::ios_base::cur:
+      new_offset = offset_ + *offset;
+      break;
+    case std::ios_base::end:
+      new_offset = file_info_->file_length_ + *offset;
+      break;
+    default:
+      /* unsupported */
+      return Status::InvalidArgument("Invalid Seek whence argument");
+  }
+
+  if(!CheckSeekBounds(new_offset)) {
+    return Status::InvalidArgument("Seek offset out of bounds");
+  }
+  offset_ = new_offset;
+
+  *offset = offset_;
+  return Status::OK();
+}
+
+/* return false if seek will be out of bounds */
+bool FileHandleImpl::CheckSeekBounds(ssize_t desired_position) {
+  ssize_t file_length = file_info_->file_length_;
+
+  if (desired_position < 0 || desired_position > file_length) {
+    return false;
+  }
+
+  return true;
+}
+
+/*
+ * Note that this method must be thread-safe w.r.t. the unsafe operations 
occurring
+ * on the FileHandle
+ */
+void FileHandleImpl::AsyncPreadSome(
+    size_t offset, const MutableBuffers &buffers,
+    std::shared_ptr<NodeExclusionRule> excluded_nodes,
+    const std::function<void(const Status &, const std::string &, size_t)> 
handler) {
+  using ::hadoop::hdfs::DatanodeInfoProto;
+  using ::hadoop::hdfs::LocatedBlockProto;
+
+  LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
+                         << FMT_THIS_ADDR << ", ...) called");
+
+  if(cancel_state_->is_canceled()) {
+    handler(Status::Canceled(), "", 0);
+    return;
+  }
+
+  if(offset == file_info_->file_length_) {
+    handler(Status::OK(), "", 0);
+    return;
+  } else if(offset > file_info_->file_length_){
+    handler(Status::InvalidOffset("AsyncPreadSome: trying to begin a read past 
the EOF"), "", 0);
+    return;
+  }
+
+  /**
+   *  Note: block and chosen_dn will end up pointing to things inside
+   *  the blocks_ vector.  They shouldn't be directly deleted.
+   **/
+  auto block = std::find_if(
+      file_info_->blocks_.begin(), file_info_->blocks_.end(), [offset](const 
LocatedBlockProto &p) {
+        return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
+      });
+
+  if (block == file_info_->blocks_.end()) {
+    LOG_WARN(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" << FMT_THIS_ADDR
+                          << ", ...) Cannot find corresponding blocks");
+    handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 
0);
+    return;
+  }
+
+  /**
+   * If user supplies a rule use it, otherwise use the tracker.
+   * User is responsible for making sure one of them isn't null.
+   **/
+  std::shared_ptr<NodeExclusionRule> rule =
+      excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_;
+
+  auto datanodes = block->locs();
+  auto it = std::find_if(datanodes.begin(), datanodes.end(),
+                         [rule](const DatanodeInfoProto &dn) {
+                           return !rule->IsBadNode(dn.id().datanodeuuid());
+                         });
+
+  if (it == datanodes.end()) {
+    LOG_WARN(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
+                          << FMT_THIS_ADDR << ", ...) No datanodes available");
+
+    handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
+    return;
+  }
+
+  DatanodeInfoProto &chosen_dn = *it;
+
+  std::string dnIpAddr = chosen_dn.id().ipaddr();
+  std::string dnHostName = chosen_dn.id().hostname();
+
+  uint64_t offset_within_block = offset - block->offset();
+  uint64_t size_within_block = std::min<uint64_t>(
+      block->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
+
+  LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
+            << FMT_THIS_ADDR << "), ...) Datanode hostname=" << dnHostName << 
", IP Address=" << dnIpAddr
+            << ", file path=\"" << path_ << "\", offset=" << 
std::to_string(offset) << ", read size=" << size_within_block);
+
+  // This is where we will put the logic for re-using a DN connection; we can
+  //    steal the FileHandle's dn and put it back when we're done
+  std::shared_ptr<DataNodeConnection> dn = 
CreateDataNodeConnection(io_service_, chosen_dn, &block->blocktoken());
+  std::string dn_id = dn->uuid_;
+  std::string client_name = client_name_;
+
+  // Wrap the DN in a block reader to handle the state and logic of the
+  //    block request protocol
+  std::shared_ptr<BlockReader> reader;
+  reader = CreateBlockReader(BlockReaderOptions(), dn, event_handlers_);
+
+  // Lambdas cannot capture copies of member variables so we'll make explicit
+  //    copies for it
+  auto event_handlers = event_handlers_;
+  auto path = path_;
+  auto cluster_name = cluster_name_;
+
+  auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, 
handler](const Status & status, size_t transferred) {
+  event_response event_resp = event_handlers->call(FILE_DN_READ_EVENT, 
cluster_name.c_str(), path.c_str(), transferred);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+    if (event_resp.response_type() == event_response::kTest_Error) {
+      handler(event_resp.status(), dn_id, transferred);
+      return;
+    }
+#endif
+
+    handler(status, dn_id, transferred);
+  };
+
+  auto connect_handler = 
[handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block,
 buffers, reader, dn_id, client_name]
+          (Status status, std::shared_ptr<DataNodeConnection> dn) {
+    (void)dn;
+    event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, 
cluster_name.c_str(), path.c_str(), 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+    if (event_resp.response_type() == event_response::kTest_Error) {
+      status = event_resp.status();
+    }
+#endif
+
+    if (status.ok()) {
+      reader->AsyncReadBlock(
+          client_name, *block, offset_within_block,
+          asio::buffer(buffers, size_within_block), read_handler);
+    } else {
+      handler(status, dn_id, 0);
+    }
+  };
+
+  dn->Connect(connect_handler);
+
+  return;
+}
+
+std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const 
BlockReaderOptions &options,
+                                                               
std::shared_ptr<DataNodeConnection> dn,
+                                                               
std::shared_ptr<LibhdfsEvents> event_handlers)
+{
+  std::shared_ptr<BlockReader> reader = 
std::make_shared<BlockReaderImpl>(options, dn, cancel_state_, event_handlers);
+
+  LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << 
FMT_THIS_ADDR
+                         << ", ..., dnconn=" << dn.get()
+                         << ") called.  New BlockReader = " << reader.get());
+
+  readers_.AddReader(reader);
+  return reader;
+}
+
+std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
+    ::asio::io_service * io_service,
+    const ::hadoop::hdfs::DatanodeInfoProto & dn,
+    const hadoop::common::TokenProto * token) {
+  LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateDataNodeConnection("
+                         << FMT_THIS_ADDR << ", ...) called");
+  return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token, 
event_handlers_.get());
+}
+
+std::shared_ptr<LibhdfsEvents> FileHandleImpl::get_event_handlers() {
+  return event_handlers_;
+}
+
+void FileHandleImpl::CancelOperations() {
+  LOG_INFO(kFileHandle, << "FileHandleImpl::CancelOperations("
+                        << FMT_THIS_ADDR << ") called");
+
+  cancel_state_->set_canceled();
+
+  /* Push update to BlockReaders that may be hung in an asio call */
+  std::vector<std::shared_ptr<BlockReader>> live_readers = 
readers_.GetLiveReaders();
+  for(auto reader : live_readers) {
+    reader->CancelOperation();
+  }
+}
+
+void FileHandleImpl::SetFileEventCallback(file_event_callback callback) {
+  std::shared_ptr<LibhdfsEvents> new_event_handlers;
+  if (event_handlers_) {
+    new_event_handlers = std::make_shared<LibhdfsEvents>(*event_handlers_);
+  } else {
+    new_event_handlers = std::make_shared<LibhdfsEvents>();
+  }
+  new_event_handlers->set_file_callback(callback);
+  event_handlers_ = new_event_handlers;
+}
+
+
+
+bool FileHandle::ShouldExclude(const Status &s) {
+  if (s.ok()) {
+    return false;
+  }
+
+  switch (s.code()) {
+    /* client side resource exhaustion */
+    case Status::kResourceUnavailable:
+    case Status::kOperationCanceled:
+      return false;
+    case Status::kInvalidArgument:
+    case Status::kUnimplemented:
+    case Status::kException:
+    default:
+      return true;
+  }
+}
+
+uint64_t FileHandleImpl::get_bytes_read() { return bytes_read_.load(); }
+
+void FileHandleImpl::clear_bytes_read() { bytes_read_.store(0); }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
new file mode 100644
index 0000000..4135156
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_
+#define LIBHDFSPP_LIB_FS_FILEHANDLE_H_
+
+#include "common/hdfs_ioservice.h"
+#include "common/async_stream.h"
+#include "common/cancel_tracker.h"
+#include "common/libhdfs_events_impl.h"
+#include "common/new_delete.h"
+#include "reader/fileinfo.h"
+#include "reader/readergroup.h"
+
+#include "asio.hpp"
+#include "bad_datanode_tracker.h"
+#include "ClientNamenodeProtocol.pb.h"
+
+#include <mutex>
+#include <iostream>
+
+namespace hdfs {
+
+class BlockReader;
+struct BlockReaderOptions;
+class DataNodeConnection;
+
+/*
+ * FileHandle: coordinates operations on a particular file in HDFS
+ *
+ * Threading model: not thread-safe; consumers and io_service should not call
+ *    concurrently.  PositionRead is the exceptions; they can be
+ *    called concurrently and repeatedly.
+ * Lifetime: pointer returned to consumer by FileSystem::Open.  Consumer is
+ *    resonsible for freeing the object.
+ */
+class FileHandleImpl : public FileHandle {
+public:
+  MEMCHECKED_CLASS(FileHandleImpl)
+  FileHandleImpl(const std::string & cluster_name,
+                 const std::string & path,
+                 ::asio::io_service *io_service, const std::string 
&client_name,
+                  const std::shared_ptr<const struct FileInfo> file_info,
+                  std::shared_ptr<BadDataNodeTracker> bad_data_nodes,
+                  std::shared_ptr<LibhdfsEvents> event_handlers);
+
+  /*
+   * Reads the file at the specified offset into the buffer.
+   * bytes_read returns the number of bytes successfully read on success
+   * and on error. Status::InvalidOffset is returned when trying to begin
+   * a read past the EOF.
+   */
+  void PositionRead(
+    void *buf,
+    size_t buf_size,
+    uint64_t offset,
+    const std::function<void(const Status &status, size_t bytes_read)> &handler
+    ) override;
+
+  /**
+   *  Reads the file at the specified offset into the buffer.
+   *  @param buf        output buffer
+   *  @param buf_size   size of the output buffer
+   *  @param offset     offset at which to start reading
+   *  @param bytes_read number of bytes successfully read
+   */
+  Status PositionRead(void *buf, size_t buf_size, off_t offset, size_t 
*bytes_read) override;
+  Status Read(void *buf, size_t buf_size, size_t *bytes_read) override;
+  Status Seek(off_t *offset, std::ios_base::seekdir whence) override;
+
+
+  /*
+   * Reads some amount of data into the buffer.  Will attempt to find the best
+   * datanode and read data from it.
+   *
+   * If an error occurs during connection or transfer, the callback will be
+   * called with bytes_read equal to the number of bytes successfully 
transferred.
+   * If no data nodes can be found, status will be Status::ResourceUnavailable.
+   * If trying to begin a read past the EOF, status will be 
Status::InvalidOffset.
+   *
+   */
+  void AsyncPreadSome(size_t offset, const MutableBuffers &buffers,
+                      std::shared_ptr<NodeExclusionRule> excluded_nodes,
+                      const std::function<void(const Status &status,
+                      const std::string &dn_id, size_t bytes_read)> handler);
+
+  /**
+   *  Cancels all operations instantiated from this FileHandle.
+   *  Will set a flag to abort continuation pipelines when they try to move to 
the next step.
+   *  Closes TCP connections to Datanode in order to abort pipelines waiting 
on slow IO.
+   **/
+  virtual void CancelOperations(void) override;
+
+  virtual void SetFileEventCallback(file_event_callback callback) override;
+
+  /**
+   * Ephemeral objects created by the filehandle will need to get the event
+   * handler registry owned by the FileSystem.
+   **/
+  std::shared_ptr<LibhdfsEvents> get_event_handlers();
+
+  /* how many bytes have been successfully read */
+  virtual uint64_t get_bytes_read() override;
+
+  /* resets the number of bytes read to zero */
+  virtual void clear_bytes_read() override;
+
+protected:
+  virtual std::shared_ptr<BlockReader> CreateBlockReader(const 
BlockReaderOptions &options,
+                                                         
std::shared_ptr<DataNodeConnection> dn,
+                                                         
std::shared_ptr<hdfs::LibhdfsEvents> event_handlers);
+  virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
+      ::asio::io_service *io_service,
+      const ::hadoop::hdfs::DatanodeInfoProto & dn,
+      const hadoop::common::TokenProto * token);
+private:
+  const std::string cluster_name_;
+  const std::string path_;
+  ::asio::io_service * const io_service_;
+  const std::string client_name_;
+  const std::shared_ptr<const struct FileInfo> file_info_;
+  std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
+  bool CheckSeekBounds(ssize_t desired_position);
+  off_t offset_;
+  CancelHandle cancel_state_;
+  ReaderGroup readers_;
+  std::shared_ptr<LibhdfsEvents> event_handlers_;
+  std::atomic<uint64_t> bytes_read_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
new file mode 100644
index 0000000..56d02d8
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -0,0 +1,859 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+
+#include "common/namenode_info.h"
+
+#include <functional>
+#include <limits>
+#include <future>
+#include <tuple>
+#include <iostream>
+#include <pwd.h>
+#include <fnmatch.h>
+
+#define FMT_THIS_ADDR "this=" << (void*)this
+
+namespace hdfs {
+
+static const char kNamenodeProtocol[] = 
"org.apache.hadoop.hdfs.protocol.ClientProtocol";
+static const int kNamenodeProtocolVersion = 1;
+
+using ::asio::ip::tcp;
+
+static constexpr uint16_t kDefaultPort = 8020;
+
+// forward declarations
+const std::string get_effective_user_name(const std::string &);
+
+uint32_t FileSystem::GetDefaultFindMaxDepth() {
+  return std::numeric_limits<uint32_t>::max();
+}
+
+uint16_t FileSystem::GetDefaultPermissionMask() {
+  return 0755;
+}
+
+Status FileSystem::CheckValidPermissionMask(uint16_t permissions) {
+  if (permissions > 01777) {
+    std::stringstream errormsg;
+    errormsg << "CheckValidPermissionMask: argument 'permissions' is " << 
std::oct
+        << std::showbase << permissions << " (should be between 0 and 01777)";
+    return Status::InvalidArgument(errormsg.str().c_str());
+  }
+  return Status::OK();
+}
+
+Status FileSystem::CheckValidReplication(uint16_t replication) {
+  if (replication < 1 || replication > 512) {
+    std::stringstream errormsg;
+    errormsg << "CheckValidReplication: argument 'replication' is "
+        << replication << " (should be between 1 and 512)";
+    return Status::InvalidArgument(errormsg.str().c_str());
+  }
+  return Status::OK();
+}
+
+FileSystem::~FileSystem() {}
+
+/*****************************************************************************
+ *                    FILESYSTEM BASE CLASS
+ ****************************************************************************/
+
+FileSystem *FileSystem::New(
+    IoService *&io_service, const std::string &user_name, const Options 
&options) {
+  return new FileSystemImpl(io_service, user_name, options);
+}
+
+FileSystem *FileSystem::New(
+    std::shared_ptr<IoService> io_service, const std::string &user_name, const 
Options &options) {
+  return new FileSystemImpl(io_service, user_name, options);
+}
+
+FileSystem *FileSystem::New() {
+  // No, this pointer won't be leaked.  The FileSystem takes ownership.
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
+  if(!io_service)
+    return nullptr;
+  int thread_count = io_service->InitDefaultWorkers();
+  if(thread_count < 1)
+    return nullptr;
+
+  std::string user_name = get_effective_user_name("");
+  Options options;
+  return new FileSystemImpl(io_service, user_name, options);
+}
+
+/*****************************************************************************
+ *                    FILESYSTEM IMPLEMENTATION
+ ****************************************************************************/
+
+const std::string get_effective_user_name(const std::string &user_name) {
+  if (!user_name.empty())
+    return user_name;
+
+  // If no user name was provided, try the HADOOP_USER_NAME and USER 
environment
+  //    variables
+  const char * env = getenv("HADOOP_USER_NAME");
+  if (env) {
+    return env;
+  }
+
+  env = getenv("USER");
+  if (env) {
+    return env;
+  }
+
+  // If running on POSIX, use the currently logged in user
+#if defined(_POSIX_VERSION)
+  uid_t uid = geteuid();
+  struct passwd *pw = getpwuid(uid);
+  if (pw && pw->pw_name)
+  {
+    return pw->pw_name;
+  }
+#endif
+
+  return "unknown_user";
+}
+
+FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string 
&user_name, const Options &options) :
+     io_service_(static_cast<IoServiceImpl *>(io_service)), options_(options),
+     client_name_(GetRandomClientName()),
+     nn_(
+       &io_service_->io_service(), options, client_name_,
+       get_effective_user_name(user_name), kNamenodeProtocol,
+       kNamenodeProtocolVersion
+     ),
+     bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
+     event_handlers_(std::make_shared<LibhdfsEvents>())
+{
+
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl("
+                         << FMT_THIS_ADDR << ") called");
+
+  // Poor man's move
+  io_service = nullptr;
+
+  unsigned int running_workers = 0;
+  if(options.io_threads_ < 1) {
+    LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl Initializing 
default number of worker threads");
+    running_workers = io_service_->InitDefaultWorkers();
+  } else {
+    LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystenImpl Initializing " 
<< options_.io_threads_ << " worker threads.");
+    running_workers = io_service->InitWorkers(options_.io_threads_);
+  }
+
+  if(running_workers < 1) {
+    LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl was unable to 
start worker threads");
+  }
+}
+
+FileSystemImpl::FileSystemImpl(std::shared_ptr<IoService> io_service, const 
std::string& user_name, const Options &options) :
+     io_service_(std::static_pointer_cast<IoServiceImpl>(io_service)), 
options_(options),
+     client_name_(GetRandomClientName()),
+     nn_(
+       &io_service_->io_service(), options, client_name_,
+       get_effective_user_name(user_name), kNamenodeProtocol,
+       kNamenodeProtocolVersion
+     ),
+     bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
+     event_handlers_(std::make_shared<LibhdfsEvents>())
+{
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl("
+                         << FMT_THIS_ADDR << ", shared IoService@" << 
io_service_.get() << ") called");
+  int worker_thread_count = io_service_->get_worker_thread_count();
+  if(worker_thread_count < 1) {
+    LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl IoService 
provided doesn't have any worker threads. "
+                          << "It needs at least 1 worker to connect to an HDFS 
cluster.")
+  } else {
+    LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl using " << 
worker_thread_count << " worker threads.");
+  }
+}
+
+FileSystemImpl::~FileSystemImpl() {
+  LOG_TRACE(kFileSystem, << "FileSystemImpl::~FileSystemImpl("
+                         << FMT_THIS_ADDR << ") called");
+
+  /**
+   * Note: IoService must be stopped before getting rid of worker threads.
+   * Once worker threads are joined and deleted the service can be deleted.
+   **/
+  io_service_->Stop();
+}
+
+void FileSystemImpl::Connect(const std::string &server,
+                             const std::string &service,
+                             const std::function<void(const Status &, 
FileSystem * fs)> &handler) {
+  LOG_INFO(kFileSystem, << "FileSystemImpl::Connect(" << FMT_THIS_ADDR
+                        << ", server=" << server << ", service="
+                        << service << ") called");
+  connect_callback_.SetCallback(handler);
+
+  /* IoService::New can return nullptr */
+  if (!io_service_) {
+    handler (Status::Error("Null IoService"), this);
+  }
+
+  // DNS lookup here for namenode(s)
+  std::vector<ResolvedNamenodeInfo> resolved_namenodes;
+
+  auto name_service = options_.services.find(server);
+  if(name_service != options_.services.end()) {
+    cluster_name_ = name_service->first;
+    resolved_namenodes = BulkResolve(&io_service_->io_service(), 
name_service->second);
+  } else {
+    cluster_name_ = server + ":" + service;
+
+    // tmp namenode info just to get this in the right format for BulkResolve
+    NamenodeInfo tmp_info;
+    try {
+      tmp_info.uri = URI::parse_from_string("hdfs://" + cluster_name_);
+    } catch (const uri_parse_error& e) {
+      LOG_ERROR(kFileSystem, << "Unable to use URI for cluster " << 
cluster_name_);
+      handler(Status::Error(("Invalid namenode " + cluster_name_ + " in 
config").c_str()), this);
+    }
+
+    resolved_namenodes = BulkResolve(&io_service_->io_service(), {tmp_info});
+  }
+
+  for(unsigned int i=0;i<resolved_namenodes.size();i++) {
+    LOG_DEBUG(kFileSystem, << "Resolved Namenode");
+    LOG_DEBUG(kFileSystem, << resolved_namenodes[i].str());
+  }
+
+
+  nn_.Connect(cluster_name_, /*server, service*/ resolved_namenodes, 
[this](const Status & s) {
+    connect_callback_.GetCallback()(s, this);
+  });
+}
+
+
+void FileSystemImpl::ConnectToDefaultFs(const std::function<void(const Status 
&, FileSystem *)> &handler) {
+  std::string scheme = options_.defaultFS.get_scheme();
+  if (strcasecmp(scheme.c_str(), "hdfs") != 0) {
+    std::string error_message;
+    error_message += "defaultFS of [" + options_.defaultFS.str() + "] is not 
supported";
+    handler(Status::InvalidArgument(error_message.c_str()), nullptr);
+    return;
+  }
+
+  std::string host = options_.defaultFS.get_host();
+  if (host.empty()) {
+    handler(Status::InvalidArgument("defaultFS must specify a hostname"), 
nullptr);
+    return;
+  }
+
+  int16_t port = options_.defaultFS.get_port_or_default(kDefaultPort);
+  std::string port_as_string = std::to_string(port);
+
+  Connect(host, port_as_string, handler);
+}
+
+int FileSystemImpl::AddWorkerThread() {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::AddWorkerThread("
+                                  << FMT_THIS_ADDR << ") called."
+                                  << " Existing thread count = " << 
WorkerThreadCount());
+
+  if(!io_service_)
+    return -1;
+
+  io_service_->AddWorkerThread();
+  return 1;
+}
+
+int FileSystemImpl::WorkerThreadCount() {
+  if(!io_service_) {
+    return -1;
+  } else {
+    return io_service_->get_worker_thread_count();
+  }
+}
+
+bool FileSystemImpl::CancelPendingConnect() {
+  if(connect_callback_.IsCallbackAccessed()) {
+    // Temp fix for failover hangs, allow CancelPendingConnect to be called so 
it can push a flag through the RPC engine
+    LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << 
"::CancelPendingConnect called after Connect completed");
+    return nn_.CancelPendingConnect();
+  }
+
+  if(!connect_callback_.IsCallbackSet()) {
+    LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << 
"::CancelPendingConnect called before Connect started");
+    return false;
+  }
+
+  // First invoke callback, then do proper teardown in RpcEngine and 
RpcConnection
+  ConnectCallback noop_callback = [](const Status &stat, FileSystem *fs) {
+    LOG_DEBUG(kFileSystem, << "Dummy callback invoked for canceled 
FileSystem@" << fs << "::Connect with status: " << stat.ToString());
+  };
+
+  bool callback_swapped = false;
+  ConnectCallback original_callback = 
connect_callback_.AtomicSwapCallback(noop_callback, callback_swapped);
+
+  if(callback_swapped) {
+    // Take original callback and invoke it as if it was canceled.
+    LOG_DEBUG(kFileSystem, << "Swapped in dummy callback.  Invoking connect 
callback with canceled status.");
+    std::function<void(void)> wrapped_callback = [original_callback, this](){
+      // handling code expected to check status before dereferenceing 'this'
+      original_callback(Status::Canceled(), this);
+    };
+    io_service_->PostTask(wrapped_callback);
+  } else {
+    LOG_INFO(kFileSystem, << "Unable to cancel FileSystem::Connect.  It hasn't 
been invoked yet or may have already completed.")
+    return false;
+  }
+
+  // Now push cancel down to clean up where possible and make sure the 
RpcEngine
+  // won't try to do retries in the background.  The rest of the memory cleanup
+  // happens when this FileSystem is deleted by the user.
+  return nn_.CancelPendingConnect();
+}
+
+void FileSystemImpl::Open(
+    const std::string &path,
+    const std::function<void(const Status &, FileHandle *)> &handler) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::Open("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  nn_.GetBlockLocations(path, 0, std::numeric_limits<int64_t>::max(), [this, 
path, handler](const Status &stat, std::shared_ptr<const struct FileInfo> 
file_info) {
+    if(!stat.ok()) {
+      LOG_DEBUG(kFileSystem, << "FileSystemImpl::Open failed to get block 
locations. status=" << stat.ToString());
+      if(stat.get_server_exception_type() == Status::kStandbyException) {
+        LOG_DEBUG(kFileSystem, << "Operation not allowed on standby datanode");
+      }
+    }
+    handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, 
&io_service_->io_service(), client_name_, file_info, bad_node_tracker_, 
event_handlers_)
+                            : nullptr);
+  });
+}
+
+
+BlockLocation LocatedBlockToBlockLocation(const 
hadoop::hdfs::LocatedBlockProto & locatedBlock)
+{
+  BlockLocation result;
+
+  result.setCorrupt(locatedBlock.corrupt());
+  result.setOffset(locatedBlock.offset());
+
+  std::vector<DNInfo> dn_info;
+  dn_info.reserve(locatedBlock.locs_size());
+  for (const hadoop::hdfs::DatanodeInfoProto & datanode_info: 
locatedBlock.locs()) {
+    const hadoop::hdfs::DatanodeIDProto &id = datanode_info.id();
+    DNInfo newInfo;
+    if (id.has_ipaddr())
+        newInfo.setIPAddr(id.ipaddr());
+    if (id.has_hostname())
+        newInfo.setHostname(id.hostname());
+    if (id.has_xferport())
+        newInfo.setXferPort(id.xferport());
+    if (id.has_infoport())
+        newInfo.setInfoPort(id.infoport());
+    if (id.has_ipcport())
+        newInfo.setIPCPort(id.ipcport());
+    if (id.has_infosecureport())
+      newInfo.setInfoSecurePort(id.infosecureport());
+    if (datanode_info.has_location())
+      newInfo.setNetworkLocation(datanode_info.location());
+    dn_info.push_back(newInfo);
+  }
+  result.setDataNodes(dn_info);
+
+  if (locatedBlock.has_b()) {
+    const hadoop::hdfs::ExtendedBlockProto & b=locatedBlock.b();
+    result.setLength(b.numbytes());
+  }
+
+
+  return result;
+}
+
+void FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t 
offset, uint64_t length,
+  const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> 
locations)> handler)
+{
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetBlockLocations("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  //Protobuf gives an error 'Negative value is not supported'
+  //if the high bit is set in uint64 in GetBlockLocations
+  if (IsHighBitSet(offset)) {
+    handler(Status::InvalidArgument("GetBlockLocations: argument 'offset' 
cannot have high bit set"), nullptr);
+    return;
+  }
+  if (IsHighBitSet(length)) {
+    handler(Status::InvalidArgument("GetBlockLocations: argument 'length' 
cannot have high bit set"), nullptr);
+    return;
+  }
+
+  auto conversion = [handler](const Status & status, std::shared_ptr<const 
struct FileInfo> fileInfo) {
+    if (status.ok()) {
+      auto result = std::make_shared<FileBlockLocation>();
+
+      result->setFileLength(fileInfo->file_length_);
+      result->setLastBlockComplete(fileInfo->last_block_complete_);
+      result->setUnderConstruction(fileInfo->under_construction_);
+
+      std::vector<BlockLocation> blocks;
+      for (const hadoop::hdfs::LocatedBlockProto & locatedBlock: 
fileInfo->blocks_) {
+          auto newLocation = LocatedBlockToBlockLocation(locatedBlock);
+          blocks.push_back(newLocation);
+      }
+      result->setBlockLocations(blocks);
+
+      handler(status, result);
+    } else {
+      handler(status, std::shared_ptr<FileBlockLocation>());
+    }
+  };
+
+  nn_.GetBlockLocations(path, offset, length, conversion);
+}
+
+void FileSystemImpl::GetPreferredBlockSize(const std::string &path,
+    const std::function<void(const Status &, const uint64_t &)> &handler) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetPreferredBlockSize("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  nn_.GetPreferredBlockSize(path, handler);
+}
+
+
+void FileSystemImpl::SetReplication(const std::string & path, int16_t 
replication, std::function<void(const Status &)> handler) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::SetReplication(" << FMT_THIS_ADDR << ", path=" << 
path <<
+      ", replication=" << replication << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be 
empty"));
+    return;
+  }
+  Status replStatus = FileSystem::CheckValidReplication(replication);
+  if (!replStatus.ok()) {
+    handler(replStatus);
+    return;
+  }
+
+  nn_.SetReplication(path, replication, handler);
+}
+
+
+void FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, 
uint64_t atime,
+    std::function<void(const Status &)> handler) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::SetTimes(" << FMT_THIS_ADDR << ", path=" << path <<
+      ", mtime=" << mtime << ", atime=" << atime << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("SetTimes: argument 'path' cannot be 
empty"));
+    return;
+  }
+
+  nn_.SetTimes(path, mtime, atime, handler);
+}
+
+
+void FileSystemImpl::GetFileInfo(
+    const std::string &path,
+    const std::function<void(const Status &, const StatInfo &)> &handler) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetFileInfo("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  nn_.GetFileInfo(path, handler);
+}
+
+void FileSystemImpl::GetContentSummary(
+    const std::string &path,
+    const std::function<void(const Status &, const ContentSummary &)> 
&handler) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetContentSummary("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  nn_.GetContentSummary(path, handler);
+}
+
+void FileSystemImpl::GetFsStats(
+    const std::function<void(const Status &, const FsInfo &)> &handler) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::GetFsStats(" << FMT_THIS_ADDR << ") called");
+
+  nn_.GetFsStats(handler);
+}
+
+
+/**
+ * Helper function for recursive GetListing calls.
+ *
+ * Some compilers don't like recursive lambdas, so we make the lambda call a
+ * method, which in turn creates a lambda calling itself.
+ */
+void FileSystemImpl::GetListingShim(const Status &stat, const 
std::vector<StatInfo> & stat_infos, bool has_more,
+                      std::string path, const std::function<bool(const Status 
&, const std::vector<StatInfo> &, bool)> &handler) {
+  bool has_next = !stat_infos.empty();
+  bool get_more = handler(stat, stat_infos, has_more && has_next);
+  if (get_more && has_more && has_next ) {
+    auto callback = [this, path, handler](const Status &stat, const 
std::vector<StatInfo> & stat_infos, bool has_more) {
+      GetListingShim(stat, stat_infos, has_more, path, handler);
+    };
+
+    std::string last = stat_infos.back().path;
+    nn_.GetListing(path, callback, last);
+  }
+}
+
+void FileSystemImpl::GetListing(
+    const std::string &path,
+    const std::function<bool(const Status &, const std::vector<StatInfo> &, 
bool)> &handler) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetListing("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+  std::string path_fixed = path;
+  if(path.back() != '/'){
+    path_fixed += "/";
+  }
+  // Caputure the state and push it into the shim
+  auto callback = [this, path_fixed, handler](const Status &stat, const 
std::vector<StatInfo> & stat_infos, bool has_more) {
+    GetListingShim(stat, stat_infos, has_more, path_fixed, handler);
+  };
+
+  nn_.GetListing(path_fixed, callback);
+}
+
+
+void FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, 
bool createparent,
+    std::function<void(const Status &)> handler) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
+      ", permissions=" << permissions << ", createparent=" << createparent << 
") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("Mkdirs: argument 'path' cannot be 
empty"));
+    return;
+  }
+
+  Status permStatus = FileSystem::CheckValidPermissionMask(permissions);
+  if (!permStatus.ok()) {
+    handler(permStatus);
+    return;
+  }
+
+  nn_.Mkdirs(path, permissions, createparent, handler);
+}
+
+
+void FileSystemImpl::Delete(const std::string &path, bool recursive,
+    const std::function<void(const Status &)> &handler) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::Delete(" << FMT_THIS_ADDR << ", path=" << path << ", 
recursive=" << recursive << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("Delete: argument 'path' cannot be 
empty"));
+    return;
+  }
+
+  nn_.Delete(path, recursive, handler);
+}
+
+
+void FileSystemImpl::Rename(const std::string &oldPath, const std::string 
&newPath,
+    const std::function<void(const Status &)> &handler) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath 
<< ", newPath=" << newPath << ") called");
+
+  if (oldPath.empty()) {
+    handler(Status::InvalidArgument("Rename: argument 'oldPath' cannot be 
empty"));
+    return;
+  }
+
+  if (newPath.empty()) {
+    handler(Status::InvalidArgument("Rename: argument 'newPath' cannot be 
empty"));
+    return;
+  }
+
+  nn_.Rename(oldPath, newPath, handler);
+}
+
+
+void FileSystemImpl::SetPermission(const std::string & path,
+    uint16_t permissions, const std::function<void(const Status &)> &handler) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::SetPermission(" << FMT_THIS_ADDR << ", path=" << 
path << ", permissions=" << permissions << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be 
empty"));
+    return;
+  }
+  Status permStatus = FileSystem::CheckValidPermissionMask(permissions);
+  if (!permStatus.ok()) {
+    handler(permStatus);
+    return;
+  }
+
+  nn_.SetPermission(path, permissions, handler);
+}
+
+
+void FileSystemImpl::SetOwner(const std::string & path, const std::string & 
username,
+    const std::string & groupname, const std::function<void(const Status &)> 
&handler) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::SetOwner(" << FMT_THIS_ADDR << ", path=" << path << 
", username=" << username << ", groupname=" << groupname << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("SetOwner: argument 'path' cannot be 
empty"));
+    return;
+  }
+
+  nn_.SetOwner(path, username, groupname, handler);
+}
+
+
+/**
+ * Helper function for recursive Find calls.
+ *
+ * Some compilers don't like recursive lambdas, so we make the lambda call a
+ * method, which in turn creates a lambda calling itself.
+ *
+ * ***High-level explanation***
+ *
+ * Since we are allowing to use wild cards in both path and name, we start by 
expanding the path first.
+ * Boolean search_path is set to true when we search for the path and false 
when we search for the name.
+ * When we search for the path we break the given path pattern into 
sub-directories. Starting from the
+ * first sub-directory we list them one-by-one and recursively continue into 
directories that matched the
+ * path pattern at the current depth. Directories that are large will be 
requested to continue sending
+ * the results. We keep track of the current depth within the path pattern in 
the 'depth' variable.
+ * This continues recursively until the depth reaches the end of the path. 
Next that we start matching
+ * the name pattern. All directories that we find we recurse now, and all 
names that match the given name
+ * pattern are being stored in outputs and later sent back to the user.
+ */
+void FileSystemImpl::FindShim(const Status &stat, const std::vector<StatInfo> 
& stat_infos, bool directory_has_more,
+                      std::shared_ptr<FindOperationalState> operational_state, 
std::shared_ptr<FindSharedState> shared_state) {
+  //We buffer the outputs then send them back at the end
+  std::vector<StatInfo> outputs;
+  //Return on error
+  if(!stat.ok()){
+    std::lock_guard<std::mutex> find_lock(shared_state->lock);
+    //We send true becuase we do not want the user code to exit before all our 
requests finished
+    shared_state->handler(stat, outputs, true);
+    shared_state->aborted = true;
+  }
+  if(!shared_state->aborted){
+    //User did not abort the operation
+    if (directory_has_more) {
+      //Directory is large and has more results
+      //We launch another async call to get more results
+      shared_state->outstanding_requests++;
+      auto callback = [this, operational_state, shared_state](const Status 
&stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
+        FindShim(stat, stat_infos, has_more, operational_state, shared_state);
+      };
+      std::string last = stat_infos.back().path;
+      nn_.GetListing(operational_state->path, callback, last);
+    }
+    if(operational_state->search_path && operational_state->depth < 
shared_state->dirs.size() - 1){
+      //We are searching for the path and did not reach the end of the path yet
+      for (StatInfo const& si : stat_infos) {
+        //If we are at the last depth and it matches both path and name, we 
need to output it.
+        if (operational_state->depth == shared_state->dirs.size() - 2
+            && !fnmatch(shared_state->dirs[operational_state->depth + 
1].c_str(), si.path.c_str(), 0)
+            && !fnmatch(shared_state->name.c_str(), si.path.c_str(), 0)) {
+          outputs.push_back(si);
+        }
+        //Skip if not directory
+        if(si.file_type != StatInfo::IS_DIR) {
+          continue;
+        }
+        //Checking for a match with the path at the current depth
+        if(!fnmatch(shared_state->dirs[operational_state->depth + 1].c_str(), 
si.path.c_str(), 0)){
+          //Launch a new requests for every matched directory
+          shared_state->outstanding_requests++;
+          auto callback = [this, si, operational_state, shared_state](const 
Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
+            std::shared_ptr<FindOperationalState> new_current_state = 
std::make_shared<FindOperationalState>(si.full_path, operational_state->depth + 
1, true);  //true because searching for the path
+            FindShim(stat, stat_infos, has_more, new_current_state, 
shared_state);
+          };
+          nn_.GetListing(si.full_path, callback);
+        }
+      }
+    }
+    else if(shared_state->maxdepth > operational_state->depth - 
shared_state->dirs.size() + 1){
+      //We are searching for the name now and maxdepth has not been reached
+      for (StatInfo const& si : stat_infos) {
+        //Launch a new request for every directory
+        if(si.file_type == StatInfo::IS_DIR) {
+          shared_state->outstanding_requests++;
+          auto callback = [this, si, operational_state, shared_state](const 
Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
+            std::shared_ptr<FindOperationalState> new_current_state = 
std::make_shared<FindOperationalState>(si.full_path, operational_state->depth + 
1, false); //false because searching for the name
+            FindShim(stat, stat_infos, has_more, new_current_state, 
shared_state);
+          };
+          nn_.GetListing(si.full_path, callback);
+        }
+        //All names that match the specified name are saved to outputs
+        if(!fnmatch(shared_state->name.c_str(), si.path.c_str(), 0)){
+          outputs.push_back(si);
+        }
+      }
+    }
+  }
+  //This section needs a lock to make sure we return the final chunk only once
+  //and no results are sent after aborted is set
+  std::lock_guard<std::mutex> find_lock(shared_state->lock);
+  //Decrement the counter once since we are done with this chunk
+  shared_state->outstanding_requests--;
+  if(shared_state->outstanding_requests == 0){
+    //Send the outputs back to the user and notify that this is the final chunk
+    shared_state->handler(stat, outputs, false);
+  } else {
+    //There will be more results and we are not aborting
+    if (outputs.size() > 0 && !shared_state->aborted){
+      //Send the outputs back to the user and notify that there is more
+      bool user_wants_more = shared_state->handler(stat, outputs, true);
+      if(!user_wants_more) {
+        //Abort if user doesn't want more
+        shared_state->aborted = true;
+      }
+    }
+  }
+}
+
+void FileSystemImpl::Find(
+    const std::string &path, const std::string &name, const uint32_t maxdepth,
+    const std::function<bool(const Status &, const std::vector<StatInfo> &, 
bool)> &handler) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::Find("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ", name="
+                                 << name << ") called");
+
+  //Populating the operational state, which includes:
+  //current search path, depth within the path, and the indication that we are 
currently searching for a path (not name yet).
+  std::shared_ptr<FindOperationalState> operational_state  = 
std::make_shared<FindOperationalState>(path, 0, true);
+  //Populating the shared state, which includes:
+  //vector of sub-directories constructed from path, name to search, handler 
to use for result returning, outstanding_requests counter, and aborted flag.
+  std::shared_ptr<FindSharedState> shared_state = 
std::make_shared<FindSharedState>(path, name, maxdepth, handler, 1, false);
+  auto callback = [this, operational_state, shared_state](const Status &stat, 
const std::vector<StatInfo> & stat_infos, bool directory_has_more) {
+    FindShim(stat, stat_infos, directory_has_more, operational_state, 
shared_state);
+  };
+  nn_.GetListing("/", callback);
+}
+
+
+void FileSystemImpl::CreateSnapshot(const std::string &path,
+    const std::string &name,
+    const std::function<void(const Status &)> &handler) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << 
path << ", name=" << name << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("CreateSnapshot: argument 'path' cannot be 
empty"));
+    return;
+  }
+
+  nn_.CreateSnapshot(path, name, handler);
+}
+
+
+void FileSystemImpl::DeleteSnapshot(const std::string &path,
+    const std::string &name,
+    const std::function<void(const Status &)> &handler) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << 
path << ", name=" << name << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("DeleteSnapshot: argument 'path' cannot be 
empty"));
+    return;
+  }
+  if (name.empty()) {
+    handler(Status::InvalidArgument("DeleteSnapshot: argument 'name' cannot be 
empty"));
+    return;
+  }
+
+  nn_.DeleteSnapshot(path, name, handler);
+}
+
+void FileSystemImpl::RenameSnapshot(const std::string &path,
+    const std::string &old_name, const std::string &new_name,
+    const std::function<void(const Status &)> &handler) {
+  LOG_DEBUG(kFileSystem,
+    << "FileSystemImpl::RenameSnapshot(" << FMT_THIS_ADDR << ", path=" << path 
<<
+    ", old_name=" << old_name << ", new_name=" << new_name << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("RenameSnapshot: argument 'path' cannot be 
empty"));
+    return;
+  }
+  if (old_name.empty()) {
+    handler(Status::InvalidArgument("RenameSnapshot: argument 'old_name' 
cannot be empty"));
+    return;
+  }
+  if (new_name.empty()) {
+    handler(Status::InvalidArgument("RenameSnapshot: argument 'new_name' 
cannot be empty"));
+    return;
+  }
+
+  nn_.RenameSnapshot(path, old_name, new_name, handler);
+}
+
+void FileSystemImpl::AllowSnapshot(const std::string &path,
+    const std::function<void(const Status &)> &handler) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << 
path << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("AllowSnapshot: argument 'path' cannot be 
empty"));
+    return;
+  }
+
+  nn_.AllowSnapshot(path, handler);
+}
+
+
+void FileSystemImpl::DisallowSnapshot(const std::string &path,
+    const std::function<void(const Status &)> &handler) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << 
path << ") called");
+
+  if (path.empty()) {
+    handler(Status::InvalidArgument("DisallowSnapshot: argument 'path' cannot 
be empty"));
+    return;
+  }
+
+  nn_.DisallowSnapshot(path, handler);
+}
+
+void FileSystemImpl::SetFsEventCallback(fs_event_callback callback) {
+  if (event_handlers_) {
+    event_handlers_->set_fs_callback(callback);
+    nn_.SetFsEventCallback(callback);
+  }
+}
+
+
+
+std::shared_ptr<LibhdfsEvents> FileSystemImpl::get_event_handlers() {
+  return event_handlers_;
+}
+
+Options FileSystemImpl::get_options() {
+  return options_;
+}
+
+std::string FileSystemImpl::get_cluster_name() {
+  return cluster_name_;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
new file mode 100644
index 0000000..f2e9abd
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_
+#define LIBHDFSPP_LIB_FS_FILESYSTEM_H_
+
+#include "filehandle.h"
+#include "hdfspp/hdfspp.h"
+#include "fs/bad_datanode_tracker.h"
+#include "reader/block_reader.h"
+#include "reader/fileinfo.h"
+
+#include "asio.hpp"
+
+#include <thread>
+#include "namenode_operations.h"
+
+namespace hdfs {
+
+/*
+ * FileSystem: The consumer's main point of interaction with the cluster as
+ * a whole.
+ *
+ * Initially constructed in a disconnected state; call Connect before operating
+ * on the FileSystem.
+ *
+ * All open files must be closed before the FileSystem is destroyed.
+ *
+ * Threading model: thread-safe for all operations
+ * Lifetime: pointer created for consumer who is responsible for deleting it
+ */
+class FileSystemImpl : public FileSystem {
+public:
+  MEMCHECKED_CLASS(FileSystemImpl)
+  typedef std::function<void(const Status &, FileSystem *)> ConnectCallback;
+
+  explicit FileSystemImpl(IoService *&io_service, const std::string& 
user_name, const Options &options);
+  explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& 
user_name, const Options &options);
+  ~FileSystemImpl() override;
+
+  /* attempt to connect to namenode, return bad status on failure */
+  void Connect(const std::string &server, const std::string &service,
+               const std::function<void(const Status &, FileSystem *)> 
&handler) override;
+  /* attempt to connect to namenode, return bad status on failure */
+  Status Connect(const std::string &server, const std::string &service) 
override;
+
+  /* Connect to the NN indicated in options.defaultFs */
+  virtual void ConnectToDefaultFs(
+      const std::function<void(const Status &, FileSystem *)> &handler) 
override;
+  virtual Status ConnectToDefaultFs() override;
+
+  /* Cancel connection if FS is in the middle of one */
+  virtual bool CancelPendingConnect() override;
+
+  virtual void Open(const std::string &path,
+                    const std::function<void(const Status &, FileHandle *)>
+                        &handler) override;
+  Status Open(const std::string &path, FileHandle **handle) override;
+
+  virtual void GetPreferredBlockSize(const std::string &path,
+      const std::function<void(const Status &, const uint64_t &)> &handler) 
override;
+  virtual Status GetPreferredBlockSize(const std::string &path, uint64_t & 
block_size) override;
+
+  virtual void SetReplication(const std::string & path, int16_t replication, 
std::function<void(const Status &)> handler) override;
+  virtual Status SetReplication(const std::string & path, int16_t replication) 
override;
+
+  void SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, 
std::function<void(const Status &)> handler) override;
+  Status SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) 
override;
+
+  void GetFileInfo(
+      const std::string &path,
+      const std::function<void(const Status &, const StatInfo &)> &handler) 
override;
+
+  Status GetFileInfo(const std::string &path, StatInfo & stat_info) override;
+
+  void GetContentSummary(const std::string &path,
+        const std::function<void(const Status &, const ContentSummary &)> 
&handler) override;
+  Status GetContentSummary(const std::string &path, ContentSummary & 
stat_info) override;
+
+  /**
+   * Retrieves the file system information such as the total raw size of all 
files in the filesystem
+   * and the raw capacity of the filesystem
+   *
+   *  @param FsInfo      struct to be populated by GetFsStats
+   **/
+  void GetFsStats(
+      const std::function<void(const Status &, const FsInfo &)> &handler) 
override;
+
+  Status GetFsStats(FsInfo & fs_info) override;
+
+  void GetListing(
+        const std::string &path,
+        const std::function<bool(const Status &, const std::vector<StatInfo> 
&, bool)> &handler) override;
+
+  Status GetListing(const std::string &path, std::vector<StatInfo> * 
stat_infos) override;
+
+  virtual void GetBlockLocations(const std::string & path, uint64_t offset, 
uint64_t length,
+    const std::function<void(const Status &, 
std::shared_ptr<FileBlockLocation> locations)> ) override;
+  virtual Status GetBlockLocations(const std::string & path, uint64_t offset, 
uint64_t length,
+    std::shared_ptr<FileBlockLocation> * locations) override;
+
+  virtual void Mkdirs(const std::string & path, uint16_t permissions, bool 
createparent,
+      std::function<void(const Status &)> handler) override;
+  virtual Status Mkdirs(const std::string & path, uint16_t permissions, bool 
createparent) override;
+
+  virtual void Delete(const std::string &path, bool recursive,
+      const std::function<void(const Status &)> &handler) override;
+  virtual Status Delete(const std::string &path, bool recursive) override;
+
+  virtual void Rename(const std::string &oldPath, const std::string &newPath,
+      const std::function<void(const Status &)> &handler) override;
+  virtual Status Rename(const std::string &oldPath, const std::string 
&newPath) override;
+
+  virtual void SetPermission(const std::string & path, uint16_t permissions,
+      const std::function<void(const Status &)> &handler) override;
+  virtual Status SetPermission(const std::string & path, uint16_t permissions) 
override;
+
+  virtual void SetOwner(const std::string & path, const std::string & username,
+      const std::string & groupname, const std::function<void(const Status &)> 
&handler) override;
+  virtual Status SetOwner(const std::string & path,
+      const std::string & username, const std::string & groupname) override;
+
+  void Find(
+          const std::string &path, const std::string &name, const uint32_t 
maxdepth,
+          const std::function<bool(const Status &, const std::vector<StatInfo> 
&, bool)> &handler) override;
+  Status Find(const std::string &path, const std::string &name, const uint32_t 
maxdepth, std::vector<StatInfo> * stat_infos) override;
+
+  
/*****************************************************************************
+   *                    FILE SYSTEM SNAPSHOT FUNCTIONS
+   
****************************************************************************/
+
+  /**
+   * Creates a snapshot of a snapshottable directory specified by path
+   *
+   *  @param path    Path to the directory to be snapshotted (must be 
non-empty)
+   *  @param name    Name to be given to the created snapshot (may be empty)
+   **/
+  void CreateSnapshot(const std::string &path, const std::string &name,
+      const std::function<void(const Status &)> &handler) override;
+  Status CreateSnapshot(const std::string &path, const std::string &name) 
override;
+
+  /**
+   * Deletes the directory snapshot specified by path and name
+   *
+   *  @param path    Path to the snapshotted directory (must be non-empty)
+   *  @param name    Name of the snapshot to be deleted (must be non-empty)
+   **/
+  void DeleteSnapshot(const std::string &path, const std::string &name,
+        const std::function<void(const Status &)> &handler) override;
+  Status DeleteSnapshot(const std::string &path, const std::string &name) 
override;
+
+  /**
+   * Renames the directory snapshot specified by path from old_name to new_name
+   *
+   *  @param path       Path to the snapshotted directory (must be non-blank)
+   *  @param old_name   Current name of the snapshot (must be non-blank)
+   *  @param new_name   New name of the snapshot (must be non-blank)
+   **/
+  void RenameSnapshot(const std::string &path, const std::string &old_name,
+      const std::string &new_name, const std::function<void(const Status &)> 
&handler) override;
+  Status RenameSnapshot(const std::string &path, const std::string &old_name,
+      const std::string &new_name) override;
+
+  /**
+   * Allows snapshots to be made on the specified directory
+   *
+   *  @param path    Path to the directory to be made snapshottable (must be 
non-empty)
+   **/
+  void AllowSnapshot(const std::string &path, const std::function<void(const 
Status &)> &handler) override;
+  Status AllowSnapshot(const std::string &path) override;
+
+  /**
+   * Disallows snapshots to be made on the specified directory
+   *
+   *  @param path    Path to the directory to be made non-snapshottable (must 
be non-empty)
+   **/
+  void DisallowSnapshot(const std::string &path, const 
std::function<void(const Status &)> &handler) override;
+  Status DisallowSnapshot(const std::string &path) override;
+
+  void SetFsEventCallback(fs_event_callback callback) override;
+
+  /* add a new thread to handle asio requests, return number of threads in pool
+   */
+  int AddWorkerThread();
+
+  /* how many worker threads are servicing asio requests */
+  int WorkerThreadCount();
+
+  /* all monitored events will need to lookup handlers */
+  std::shared_ptr<LibhdfsEvents> get_event_handlers();
+
+  Options get_options() override;
+
+  std::string get_cluster_name() override;
+
+private:
+  /**
+   *  The IoService must be the first member variable to ensure that it gets
+   *  destroyed last.  This allows other members to dequeue things from the
+   *  service in their own destructors.
+   *  A side effect of this is that requests may outlive the RpcEngine they
+   *  reference.
+   **/
+  std::shared_ptr<IoServiceImpl> io_service_;
+  const Options options_;
+  const std::string client_name_;
+  std::string cluster_name_;
+  NameNodeOperations nn_;
+  std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
+
+  // Keep connect callback around in case it needs to be canceled
+  SwappableCallbackHolder<ConnectCallback> connect_callback_;
+
+  /**
+   * Runtime event monitoring handlers.
+   * Note:  This is really handy to have for advanced usage but
+   * exposes implementation details that may change at any time.
+   **/
+  std::shared_ptr<LibhdfsEvents> event_handlers_;
+
+  void GetListingShim(const Status &stat, const std::vector<StatInfo> & 
stat_infos, bool has_more,
+              std::string path, const std::function<bool(const Status &, const 
std::vector<StatInfo> &, bool)> &handler);
+
+  struct FindSharedState {
+    //Name pattern (can have wild-cards) to find
+    const std::string name;
+    //Maximum depth to recurse after the end of path is reached.
+    //Can be set to 0 for pure path globbing and ignoring name pattern 
entirely.
+    const uint32_t maxdepth;
+    //Vector of all sub-directories from the path argument (each can have 
wild-cards)
+    std::vector<std::string> dirs;
+    //Callback from Find
+    const std::function<bool(const Status &, const std::vector<StatInfo> &, 
bool)> handler;
+    //outstanding_requests is incremented once for every GetListing call.
+    std::atomic<uint64_t> outstanding_requests;
+    //Boolean needed to abort all recursion on error or on user command
+    std::atomic<bool> aborted;
+    //Shared variables will need protection with a lock
+    std::mutex lock;
+    FindSharedState(const std::string path_, const std::string name_, const 
uint32_t maxdepth_,
+                const std::function<bool(const Status &, const 
std::vector<StatInfo> &, bool)> handler_,
+                uint64_t outstanding_recuests_, bool aborted_)
+        : name(name_),
+          maxdepth(maxdepth_),
+          handler(handler_),
+          outstanding_requests(outstanding_recuests_),
+          aborted(aborted_),
+          lock() {
+      //Constructing the list of sub-directories
+      std::stringstream ss(path_);
+      if(path_.back() != '/'){
+        ss << "/";
+      }
+      for (std::string token; std::getline(ss, token, '/'); ) {
+        dirs.push_back(token);
+      }
+    }
+  };
+
+  struct FindOperationalState {
+    const std::string path;
+    const uint32_t depth;
+    const bool search_path;
+    FindOperationalState(const std::string path_, const uint32_t depth_, const 
bool search_path_)
+        : path(path_),
+          depth(depth_),
+          search_path(search_path_) {
+    }
+  };
+
+  void FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos,
+                bool directory_has_more, std::shared_ptr<FindOperationalState> 
current_state, std::shared_ptr<FindSharedState> shared_state);
+
+};
+}
+
+#endif


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to