fs

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6ca03c0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6ca03c0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6ca03c0

Branch: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1
Commit: e6ca03c0357b4b9061349e9903a2064ee0c51715
Parents: f34bda7
Author: Haohui Mai <whe...@apache.org>
Authored: Tue Jul 14 12:43:09 2015 -0700
Committer: Haohui Mai <whe...@apache.org>
Committed: Tue Jul 14 12:47:00 2015 -0700

----------------------------------------------------------------------
 .../native/libhdfspp/include/libhdfspp/hdfs.h   |  50 ++++++
 .../main/native/libhdfspp/lib/common/hdfs.cc    |  29 ++++
 .../main/native/libhdfspp/lib/common/wrapper.h  |  42 +++++
 .../main/native/libhdfspp/lib/fs/CMakeLists.txt |   4 +
 .../main/native/libhdfspp/lib/fs/filesystem.cc  |  95 +++++++++++
 .../main/native/libhdfspp/lib/fs/filesystem.h   |  61 +++++++
 .../main/native/libhdfspp/lib/fs/inputstream.cc |  53 ++++++
 .../native/libhdfspp/lib/fs/inputstream_impl.h  | 160 +++++++++++++++++++
 .../native/libhdfspp/lib/fs/inputstream_test.cc |  82 ++++++++++
 .../native/libhdfspp/lib/fs/namenode_protocol.h |  42 +++++
 10 files changed, 618 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
new file mode 100644
index 0000000..d12f20e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_HDFS_H_
+#define LIBHDFSPP_HDFS_H_
+
+#include "libhdfspp/status.h"
+
+namespace hdfs {
+
+class IoService {
+ public:
+  static IoService *New();
+  virtual void Run() = 0;
+  virtual void Stop() = 0;
+  virtual ~IoService();
+};
+
+
+class InputStream {
+ public:
+  virtual Status PositionRead(void *buf, size_t nbyte, size_t offset, size_t 
*read_bytes) = 0;
+  virtual ~InputStream();
+};
+
+class FileSystem {
+ public:
+  static Status New(IoService *io_service, const char *server,
+                    unsigned short port, FileSystem **fsptr);
+  virtual Status Open(const char *path, InputStream **isptr) = 0;
+  virtual ~FileSystem();
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc
new file mode 100644
index 0000000..e7a5d6c
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "wrapper.h"
+
+namespace hdfs {
+
+IoService::~IoService() {}
+
+IoService *IoService::New() {
+  return new IoServiceImpl();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h
new file mode 100644
index 0000000..39d26cc
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMON_WRAPPER_H_
+#define COMMON_WRAPPER_H_
+
+#include "libhdfspp/hdfs.h"
+
+#include <asio/io_service.hpp>
+
+namespace hdfs {
+
+class IoServiceImpl : public IoService {
+ public:
+  virtual void Run() override {
+    asio::io_service::work work(io_service_);
+    io_service_.run();
+  }
+  virtual void Stop() override { io_service_.stop(); }
+  ::asio::io_service &io_service() { return io_service_; }
+ private:
+  ::asio::io_service io_service_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
new file mode 100644
index 0000000..bd649ff
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
@@ -0,0 +1,4 @@
+add_library(fs filesystem.cc inputstream.cc)
+add_dependencies(fs proto)
+add_executable(inputstream_test inputstream_test.cc)
+target_link_libraries(inputstream_test common fs rpc reader proto 
${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
new file mode 100644
index 0000000..ab322c6
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+
+#include "common/util.h"
+
+#include <asio/ip/tcp.hpp>
+
+#include <limits>
+
+namespace hdfs {
+
+static const char kNamenodeProtocol[] = 
"org.apache.hadoop.hdfs.protocol.ClientProtocol";
+static const int kNamenodeProtocolVersion = 1;
+
+using ::asio::ip::tcp;
+
+FileSystem::~FileSystem()
+{}
+
+Status FileSystem::New(IoService *io_service, const char *server,
+                       unsigned short port, FileSystem **fsptr) {
+  std::unique_ptr<FileSystemImpl> impl(new FileSystemImpl(io_service));
+  Status stat = impl->Connect(server, port);
+  if (stat.ok()) {
+    *fsptr = impl.release();
+  }
+  return stat;
+}
+
+FileSystemImpl::FileSystemImpl(IoService *io_service)
+    : io_service_(static_cast<IoServiceImpl*>(io_service))
+    , engine_(&io_service_->io_service(), RpcEngine::GetRandomClientName(),
+              kNamenodeProtocol, kNamenodeProtocolVersion)
+    , namenode_(&engine_)
+{}
+
+Status FileSystemImpl::Connect(const char *server, unsigned short port) {
+  asio::error_code ec;
+  tcp::resolver resolver(io_service_->io_service());
+  tcp::resolver::query query(tcp::v4(), server, std::to_string(port));
+  tcp::resolver::iterator iterator = resolver.resolve(query, ec);
+
+  if (ec) {
+    return ToStatus(ec);
+  }
+
+  std::vector<tcp::endpoint> servers(iterator, tcp::resolver::iterator());
+  Status stat = engine_.Connect(servers);
+  if (!stat.ok()) {
+    return stat;
+  }
+  engine_.Start();
+  return stat;
+}
+
+Status FileSystemImpl::Open(const char *path, InputStream **isptr) {
+  using ::hadoop::hdfs::GetBlockLocationsRequestProto;
+  using ::hadoop::hdfs::GetBlockLocationsResponseProto;
+
+  GetBlockLocationsRequestProto req;
+  auto resp = std::make_shared<GetBlockLocationsResponseProto>();
+  req.set_src(path);
+  req.set_offset(0);
+  req.set_length(std::numeric_limits<long long>::max());
+  auto stat_p = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat_p->get_future());
+  namenode_.GetBlockLocations(&req, resp, 
+                              [stat_p](const Status &status) { 
stat_p->set_value(status); });
+  Status stat = future.get();
+  if (!stat.ok()) {
+    return stat;
+  }
+
+  *isptr = new InputStreamImpl(this, &resp->locations());
+  return Status::OK();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
new file mode 100644
index 0000000..30536eb
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FS_FILESYSTEM_H_
+#define FS_FILESYSTEM_H_
+
+#include "common/wrapper.h"
+#include "libhdfspp/hdfs.h"
+#include "rpc/rpc_engine.h"
+#include "ClientNamenodeProtocol.pb.h"
+#include "ClientNamenodeProtocol.hrpc.inl"
+
+namespace hdfs {
+
+class FileSystemImpl : public FileSystem {
+ public:
+  FileSystemImpl(IoService *io_service);
+  Status Connect(const char *server, unsigned short port);
+  virtual Status Open(const char *path, InputStream **isptr) override;
+  RpcEngine &rpc_engine() { return engine_; }
+ private:
+  IoServiceImpl *io_service_;
+  RpcEngine engine_;
+  ClientNamenodeProtocol namenode_;
+};
+
+class InputStreamImpl : public InputStream {
+ public:
+  InputStreamImpl(FileSystemImpl *fs, const ::hadoop::hdfs::LocatedBlocksProto 
*blocks);
+  virtual Status PositionRead(void *buf, size_t nbyte, size_t offset, size_t 
*read_bytes) override;
+  template<class MutableBufferSequence, class Handler>
+  void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers,
+                      const Handler &handler);
+ private:
+  FileSystemImpl *fs_;
+  unsigned long long file_length_;
+  std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
+  struct HandshakeContinuation;
+  template<class MutableBufferSequence>
+  struct ReadBlockContinuation;
+};
+
+}
+
+#include "inputstream_impl.h"
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
new file mode 100644
index 0000000..a41c684
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+
+namespace hdfs {
+
+using ::hadoop::hdfs::LocatedBlocksProto;
+
+InputStream::~InputStream()
+{}
+
+InputStreamImpl::InputStreamImpl(FileSystemImpl *fs, const LocatedBlocksProto 
*blocks)
+    : fs_(fs)
+    , file_length_(blocks->filelength())
+{
+  for (const auto &block : blocks->blocks()) {
+    blocks_.push_back(block);
+  }
+
+  if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) {
+    blocks_.push_back(blocks->lastblock());
+  }
+}
+
+Status InputStreamImpl::PositionRead(void *buf, size_t nbyte, size_t offset, 
size_t *read_bytes) {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat->get_future());
+  auto handler = [stat,read_bytes](const Status &status, size_t transferred) {
+    *read_bytes = transferred;
+    stat->set_value(status);
+  };
+
+  AsyncPreadSome(offset, asio::buffer(buf, nbyte), handler);
+  return future.get();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
new file mode 100644
index 0000000..88e1912
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FS_INPUTSTREAM_IMPL_H_
+#define FS_INPUTSTREAM_IMPL_H_
+
+#include "reader/block_reader.h"
+
+#include "common/continuation/asio.h"
+#include "common/continuation/protobuf.h"
+
+#include <functional>
+#include <future>
+
+namespace hdfs {
+
+struct InputStreamImpl::HandshakeContinuation : continuation::Continuation {
+  typedef RemoteBlockReader<::asio::ip::tcp::socket> Reader;
+  HandshakeContinuation(Reader *reader, const std::string &client_name,
+               const hadoop::common::TokenProto *token,
+               const hadoop::hdfs::ExtendedBlockProto *block,
+               uint64_t length, uint64_t offset)
+      : reader_(reader)
+      , client_name_(client_name)
+      , length_(length)
+      , offset_(offset)
+  {
+    if (token) {
+      token_.reset(new hadoop::common::TokenProto());
+      token_->CheckTypeAndMergeFrom(*token);
+    }
+    block_.CheckTypeAndMergeFrom(*block);
+  }
+
+  virtual void Run(const Next& next) override {
+    reader_->async_connect(client_name_, token_.get(), &block_, length_, 
offset_, next);
+  }
+
+ private:
+  Reader *reader_;
+  const std::string client_name_;
+  std::unique_ptr<hadoop::common::TokenProto> token_;
+  hadoop::hdfs::ExtendedBlockProto block_;
+  uint64_t length_;
+  uint64_t offset_;
+};
+
+template<class MutableBufferSequence>
+struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
+  typedef RemoteBlockReader<::asio::ip::tcp::socket> Reader;
+  ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer,
+                 size_t *transferred)
+      : reader_(reader)
+      , buffer_(buffer)
+      , buffer_size_(asio::buffer_size(buffer))
+      , transferred_(transferred)
+  {}
+
+  virtual void Run(const Next& next) override {
+    *transferred_ = 0;
+    next_ = next;
+    OnReadData(Status::OK(), 0);
+  }
+
+ private:
+  Reader *reader_;
+  MutableBufferSequence buffer_;
+  const size_t buffer_size_;
+  size_t *transferred_;
+  std::function<void(const Status &)> next_;
+
+  void OnReadData(const Status &status, size_t transferred) {
+    using std::placeholders::_1;
+    using std::placeholders::_2;
+    *transferred_ += transferred;
+    if (!status.ok()) {
+      next_(status);
+    } else if (*transferred_ >= buffer_size_) {
+      next_(status);
+    } else {
+      reader_->async_read_some(
+          asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
+          std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
+    }
+  }
+};
+
+
+template<class MutableBufferSequence, class Handler>
+void InputStreamImpl::AsyncPreadSome(
+    size_t offset, const MutableBufferSequence &buffers,
+    const Handler &handler) {
+  using ::hadoop::hdfs::LocatedBlockProto;
+  namespace ip = ::asio::ip;
+  using ::asio::ip::tcp;
+
+  auto it = std::find_if(
+      blocks_.begin(), blocks_.end(),
+      [offset](const LocatedBlockProto &p) {
+        return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
+      });
+
+  if (it == blocks_.end()) {
+    handler(Status::InvalidArgument("Cannot find corresponding blocks"), 0);
+    return;
+  } else if (!it->locs_size()) {
+    handler(Status::ResourceUnavailable("No datanodes available"), 0);
+    return;
+  }
+
+  uint64_t offset_within_block = offset - it->offset();
+  uint64_t size_within_block =
+      std::min<uint64_t>(it->b().numbytes() - offset_within_block, 
asio::buffer_size(buffers));
+
+  struct State {
+    std::unique_ptr<tcp::socket> conn;
+    std::shared_ptr<RemoteBlockReader<tcp::socket> > reader;
+    LocatedBlockProto block;
+    std::vector<tcp::endpoint> endpoints;
+    size_t transferred;
+  };
+
+  auto m = continuation::Pipeline<State>::Create();
+  auto &s = m->state();
+  s.conn.reset(new tcp::socket(fs_->rpc_engine().io_service()));
+  s.reader = std::make_shared<RemoteBlockReader<tcp::socket> 
>(BlockReaderOptions(), s.conn.get());
+  s.block = *it;
+  for (auto &loc : it->locs()) {
+    auto datanode = loc.id();
+    
s.endpoints.push_back(tcp::endpoint(ip::address::from_string(datanode.ipaddr()),
 datanode.xferport()));
+  }
+
+  m->Push(continuation::Connect(s.conn.get(), s.endpoints.begin(), 
s.endpoints.end()))
+      .Push(new HandshakeContinuation(s.reader.get(), 
fs_->rpc_engine().client_name(), nullptr,
+                                   &s.block.b(), size_within_block, 
offset_within_block))
+      .Push(new ReadBlockContinuation<::asio::mutable_buffers_1>(
+          s.reader.get(), asio::buffer(buffers, size_within_block), 
&s.transferred));
+
+  m->Run([handler](const Status &status, const State &state) {
+      handler(status, state.transferred);
+    });
+}
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc
new file mode 100644
index 0000000..dceac86
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "libhdfspp/hdfs.h"
+
+#include <iostream>
+#include <string>
+#include <thread>
+
+using namespace hdfs;
+
+class Executor {
+ public:
+  Executor()
+      : io_service_(IoService::New())
+      , thread_(std::bind(&IoService::Run, io_service_.get()))
+  {}
+
+  IoService *io_service() { return io_service_.get(); }
+
+  ~Executor() {
+    io_service_->Stop();
+    thread_.join();
+  }
+
+  std::unique_ptr<IoService> io_service_;
+  std::thread thread_;
+};
+
+int main(int argc, char *argv[]) {
+  if (argc != 4) {
+    std::cerr
+        << "Print files stored in a HDFS cluster.\n"
+        << "Usage: " << argv[0] << " "
+        << "<nnhost> <nnport> <file>\n";
+    return 1;
+  }
+
+  Executor executor;
+
+  FileSystem *fsptr;
+  Status stat = FileSystem::New(executor.io_service(), argv[1], 
std::stoi(argv[2]), &fsptr);
+  if (!stat.ok()) {
+    std::cerr << "Cannot create the filesystem: " << stat.ToString() << 
std::endl;
+    return 1;
+  }
+
+  std::unique_ptr<FileSystem> fs(fsptr);
+
+  InputStream *isptr;
+  stat = fs->Open(argv[3], &isptr);
+  if (!stat.ok()) {
+    std::cerr << "Cannot open the file: " << stat.ToString() << std::endl;
+    return 1;
+  }
+
+  std::unique_ptr<InputStream> is(isptr);
+
+  char buf[8192] = {0,};
+  size_t read_bytes = 0;
+  stat = is->PositionRead(buf, sizeof(buf), 0, &read_bytes);
+  if (!stat.ok()) {
+    std::cerr << "Read failures: " << stat.ToString() << std::endl;
+  }
+  std::cerr << "Read bytes:" << read_bytes << std::endl << buf << std::endl;
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h
new file mode 100644
index 0000000..80aa237
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FS_NAMENODE_PROTOCOL_H_
+#define FS_NAMENODE_PROTOCOL_H_
+
+#include "ClientNamenodeProtocol.pb.h"
+#include "rpc/rpc_engine.h"
+
+namespace hdfs {
+
+class ClientNamenodeProtocol {
+ public:
+  ClientNamenodeProtocol(RpcEngine *engine)
+      : engine_(engine)
+  {}
+
+  Status GetBlockLocations(const ::hadoop::hdfs::GetBlockLocationsRequestProto 
*request,
+                           
std::shared_ptr<::hadoop::hdfs::GetBlockLocationsResponseProto> response) {
+    return engine_->Rpc("getBlockLocations", request, response);
+  }
+ private:
+  RpcEngine *engine_;
+};
+
+};
+
+#endif

Reply via email to