fs
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6ca03c0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6ca03c0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6ca03c0 Branch: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1 Commit: e6ca03c0357b4b9061349e9903a2064ee0c51715 Parents: f34bda7 Author: Haohui Mai <whe...@apache.org> Authored: Tue Jul 14 12:43:09 2015 -0700 Committer: Haohui Mai <whe...@apache.org> Committed: Tue Jul 14 12:47:00 2015 -0700 ---------------------------------------------------------------------- .../native/libhdfspp/include/libhdfspp/hdfs.h | 50 ++++++ .../main/native/libhdfspp/lib/common/hdfs.cc | 29 ++++ .../main/native/libhdfspp/lib/common/wrapper.h | 42 +++++ .../main/native/libhdfspp/lib/fs/CMakeLists.txt | 4 + .../main/native/libhdfspp/lib/fs/filesystem.cc | 95 +++++++++++ .../main/native/libhdfspp/lib/fs/filesystem.h | 61 +++++++ .../main/native/libhdfspp/lib/fs/inputstream.cc | 53 ++++++ .../native/libhdfspp/lib/fs/inputstream_impl.h | 160 +++++++++++++++++++ .../native/libhdfspp/lib/fs/inputstream_test.cc | 82 ++++++++++ .../native/libhdfspp/lib/fs/namenode_protocol.h | 42 +++++ 10 files changed, 618 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h new file mode 100644 index 0000000..d12f20e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_HDFS_H_ +#define LIBHDFSPP_HDFS_H_ + +#include "libhdfspp/status.h" + +namespace hdfs { + +class IoService { + public: + static IoService *New(); + virtual void Run() = 0; + virtual void Stop() = 0; + virtual ~IoService(); +}; + + +class InputStream { + public: + virtual Status PositionRead(void *buf, size_t nbyte, size_t offset, size_t *read_bytes) = 0; + virtual ~InputStream(); +}; + +class FileSystem { + public: + static Status New(IoService *io_service, const char *server, + unsigned short port, FileSystem **fsptr); + virtual Status Open(const char *path, InputStream **isptr) = 0; + virtual ~FileSystem(); +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc new file mode 100644 index 0000000..e7a5d6c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "wrapper.h" + +namespace hdfs { + +IoService::~IoService() {} + +IoService *IoService::New() { + return new IoServiceImpl(); +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h new file mode 100644 index 0000000..39d26cc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMON_WRAPPER_H_ +#define COMMON_WRAPPER_H_ + +#include "libhdfspp/hdfs.h" + +#include <asio/io_service.hpp> + +namespace hdfs { + +class IoServiceImpl : public IoService { + public: + virtual void Run() override { + asio::io_service::work work(io_service_); + io_service_.run(); + } + virtual void Stop() override { io_service_.stop(); } + ::asio::io_service &io_service() { return io_service_; } + private: + ::asio::io_service io_service_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt new file mode 100644 index 0000000..bd649ff --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt @@ -0,0 +1,4 @@ +add_library(fs filesystem.cc inputstream.cc) +add_dependencies(fs proto) +add_executable(inputstream_test inputstream_test.cc) +target_link_libraries(inputstream_test common fs rpc reader proto ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc new file mode 100644 index 0000000..ab322c6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filesystem.h" + +#include "common/util.h" + +#include <asio/ip/tcp.hpp> + +#include <limits> + +namespace hdfs { + +static const char kNamenodeProtocol[] = "org.apache.hadoop.hdfs.protocol.ClientProtocol"; +static const int kNamenodeProtocolVersion = 1; + +using ::asio::ip::tcp; + +FileSystem::~FileSystem() +{} + +Status FileSystem::New(IoService *io_service, const char *server, + unsigned short port, FileSystem **fsptr) { + std::unique_ptr<FileSystemImpl> impl(new FileSystemImpl(io_service)); + Status stat = impl->Connect(server, port); + if (stat.ok()) { + *fsptr = impl.release(); + } + return stat; +} + +FileSystemImpl::FileSystemImpl(IoService *io_service) + : io_service_(static_cast<IoServiceImpl*>(io_service)) + , engine_(&io_service_->io_service(), RpcEngine::GetRandomClientName(), + kNamenodeProtocol, kNamenodeProtocolVersion) + , namenode_(&engine_) +{} + +Status FileSystemImpl::Connect(const char *server, unsigned short port) { + asio::error_code ec; + tcp::resolver resolver(io_service_->io_service()); + tcp::resolver::query query(tcp::v4(), server, std::to_string(port)); + tcp::resolver::iterator iterator = resolver.resolve(query, ec); + + if (ec) { + return ToStatus(ec); + } + + std::vector<tcp::endpoint> servers(iterator, tcp::resolver::iterator()); + Status stat = engine_.Connect(servers); + if (!stat.ok()) { + return stat; + } + engine_.Start(); + return stat; +} + +Status FileSystemImpl::Open(const char *path, InputStream **isptr) { + using ::hadoop::hdfs::GetBlockLocationsRequestProto; + using ::hadoop::hdfs::GetBlockLocationsResponseProto; + + GetBlockLocationsRequestProto req; + auto resp = std::make_shared<GetBlockLocationsResponseProto>(); + req.set_src(path); + req.set_offset(0); + req.set_length(std::numeric_limits<long long>::max()); + auto stat_p = std::make_shared<std::promise<Status>>(); + std::future<Status> future(stat_p->get_future()); + namenode_.GetBlockLocations(&req, resp, + [stat_p](const Status &status) { stat_p->set_value(status); }); + Status stat = future.get(); + if (!stat.ok()) { + return stat; + } + + *isptr = new InputStreamImpl(this, &resp->locations()); + return Status::OK(); +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h new file mode 100644 index 0000000..30536eb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef FS_FILESYSTEM_H_ +#define FS_FILESYSTEM_H_ + +#include "common/wrapper.h" +#include "libhdfspp/hdfs.h" +#include "rpc/rpc_engine.h" +#include "ClientNamenodeProtocol.pb.h" +#include "ClientNamenodeProtocol.hrpc.inl" + +namespace hdfs { + +class FileSystemImpl : public FileSystem { + public: + FileSystemImpl(IoService *io_service); + Status Connect(const char *server, unsigned short port); + virtual Status Open(const char *path, InputStream **isptr) override; + RpcEngine &rpc_engine() { return engine_; } + private: + IoServiceImpl *io_service_; + RpcEngine engine_; + ClientNamenodeProtocol namenode_; +}; + +class InputStreamImpl : public InputStream { + public: + InputStreamImpl(FileSystemImpl *fs, const ::hadoop::hdfs::LocatedBlocksProto *blocks); + virtual Status PositionRead(void *buf, size_t nbyte, size_t offset, size_t *read_bytes) override; + template<class MutableBufferSequence, class Handler> + void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers, + const Handler &handler); + private: + FileSystemImpl *fs_; + unsigned long long file_length_; + std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_; + struct HandshakeContinuation; + template<class MutableBufferSequence> + struct ReadBlockContinuation; +}; + +} + +#include "inputstream_impl.h" + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc new file mode 100644 index 0000000..a41c684 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filesystem.h" + +namespace hdfs { + +using ::hadoop::hdfs::LocatedBlocksProto; + +InputStream::~InputStream() +{} + +InputStreamImpl::InputStreamImpl(FileSystemImpl *fs, const LocatedBlocksProto *blocks) + : fs_(fs) + , file_length_(blocks->filelength()) +{ + for (const auto &block : blocks->blocks()) { + blocks_.push_back(block); + } + + if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) { + blocks_.push_back(blocks->lastblock()); + } +} + +Status InputStreamImpl::PositionRead(void *buf, size_t nbyte, size_t offset, size_t *read_bytes) { + auto stat = std::make_shared<std::promise<Status>>(); + std::future<Status> future(stat->get_future()); + auto handler = [stat,read_bytes](const Status &status, size_t transferred) { + *read_bytes = transferred; + stat->set_value(status); + }; + + AsyncPreadSome(offset, asio::buffer(buf, nbyte), handler); + return future.get(); +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h new file mode 100644 index 0000000..88e1912 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef FS_INPUTSTREAM_IMPL_H_ +#define FS_INPUTSTREAM_IMPL_H_ + +#include "reader/block_reader.h" + +#include "common/continuation/asio.h" +#include "common/continuation/protobuf.h" + +#include <functional> +#include <future> + +namespace hdfs { + +struct InputStreamImpl::HandshakeContinuation : continuation::Continuation { + typedef RemoteBlockReader<::asio::ip::tcp::socket> Reader; + HandshakeContinuation(Reader *reader, const std::string &client_name, + const hadoop::common::TokenProto *token, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, uint64_t offset) + : reader_(reader) + , client_name_(client_name) + , length_(length) + , offset_(offset) + { + if (token) { + token_.reset(new hadoop::common::TokenProto()); + token_->CheckTypeAndMergeFrom(*token); + } + block_.CheckTypeAndMergeFrom(*block); + } + + virtual void Run(const Next& next) override { + reader_->async_connect(client_name_, token_.get(), &block_, length_, offset_, next); + } + + private: + Reader *reader_; + const std::string client_name_; + std::unique_ptr<hadoop::common::TokenProto> token_; + hadoop::hdfs::ExtendedBlockProto block_; + uint64_t length_; + uint64_t offset_; +}; + +template<class MutableBufferSequence> +struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation { + typedef RemoteBlockReader<::asio::ip::tcp::socket> Reader; + ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer, + size_t *transferred) + : reader_(reader) + , buffer_(buffer) + , buffer_size_(asio::buffer_size(buffer)) + , transferred_(transferred) + {} + + virtual void Run(const Next& next) override { + *transferred_ = 0; + next_ = next; + OnReadData(Status::OK(), 0); + } + + private: + Reader *reader_; + MutableBufferSequence buffer_; + const size_t buffer_size_; + size_t *transferred_; + std::function<void(const Status &)> next_; + + void OnReadData(const Status &status, size_t transferred) { + using std::placeholders::_1; + using std::placeholders::_2; + *transferred_ += transferred; + if (!status.ok()) { + next_(status); + } else if (*transferred_ >= buffer_size_) { + next_(status); + } else { + reader_->async_read_some( + asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_), + std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2)); + } + } +}; + + +template<class MutableBufferSequence, class Handler> +void InputStreamImpl::AsyncPreadSome( + size_t offset, const MutableBufferSequence &buffers, + const Handler &handler) { + using ::hadoop::hdfs::LocatedBlockProto; + namespace ip = ::asio::ip; + using ::asio::ip::tcp; + + auto it = std::find_if( + blocks_.begin(), blocks_.end(), + [offset](const LocatedBlockProto &p) { + return p.offset() <= offset && offset < p.offset() + p.b().numbytes(); + }); + + if (it == blocks_.end()) { + handler(Status::InvalidArgument("Cannot find corresponding blocks"), 0); + return; + } else if (!it->locs_size()) { + handler(Status::ResourceUnavailable("No datanodes available"), 0); + return; + } + + uint64_t offset_within_block = offset - it->offset(); + uint64_t size_within_block = + std::min<uint64_t>(it->b().numbytes() - offset_within_block, asio::buffer_size(buffers)); + + struct State { + std::unique_ptr<tcp::socket> conn; + std::shared_ptr<RemoteBlockReader<tcp::socket> > reader; + LocatedBlockProto block; + std::vector<tcp::endpoint> endpoints; + size_t transferred; + }; + + auto m = continuation::Pipeline<State>::Create(); + auto &s = m->state(); + s.conn.reset(new tcp::socket(fs_->rpc_engine().io_service())); + s.reader = std::make_shared<RemoteBlockReader<tcp::socket> >(BlockReaderOptions(), s.conn.get()); + s.block = *it; + for (auto &loc : it->locs()) { + auto datanode = loc.id(); + s.endpoints.push_back(tcp::endpoint(ip::address::from_string(datanode.ipaddr()), datanode.xferport())); + } + + m->Push(continuation::Connect(s.conn.get(), s.endpoints.begin(), s.endpoints.end())) + .Push(new HandshakeContinuation(s.reader.get(), fs_->rpc_engine().client_name(), nullptr, + &s.block.b(), size_within_block, offset_within_block)) + .Push(new ReadBlockContinuation<::asio::mutable_buffers_1>( + s.reader.get(), asio::buffer(buffers, size_within_block), &s.transferred)); + + m->Run([handler](const Status &status, const State &state) { + handler(status, state.transferred); + }); +} + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc new file mode 100644 index 0000000..dceac86 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "libhdfspp/hdfs.h" + +#include <iostream> +#include <string> +#include <thread> + +using namespace hdfs; + +class Executor { + public: + Executor() + : io_service_(IoService::New()) + , thread_(std::bind(&IoService::Run, io_service_.get())) + {} + + IoService *io_service() { return io_service_.get(); } + + ~Executor() { + io_service_->Stop(); + thread_.join(); + } + + std::unique_ptr<IoService> io_service_; + std::thread thread_; +}; + +int main(int argc, char *argv[]) { + if (argc != 4) { + std::cerr + << "Print files stored in a HDFS cluster.\n" + << "Usage: " << argv[0] << " " + << "<nnhost> <nnport> <file>\n"; + return 1; + } + + Executor executor; + + FileSystem *fsptr; + Status stat = FileSystem::New(executor.io_service(), argv[1], std::stoi(argv[2]), &fsptr); + if (!stat.ok()) { + std::cerr << "Cannot create the filesystem: " << stat.ToString() << std::endl; + return 1; + } + + std::unique_ptr<FileSystem> fs(fsptr); + + InputStream *isptr; + stat = fs->Open(argv[3], &isptr); + if (!stat.ok()) { + std::cerr << "Cannot open the file: " << stat.ToString() << std::endl; + return 1; + } + + std::unique_ptr<InputStream> is(isptr); + + char buf[8192] = {0,}; + size_t read_bytes = 0; + stat = is->PositionRead(buf, sizeof(buf), 0, &read_bytes); + if (!stat.ok()) { + std::cerr << "Read failures: " << stat.ToString() << std::endl; + } + std::cerr << "Read bytes:" << read_bytes << std::endl << buf << std::endl; + return 0; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h new file mode 100644 index 0000000..80aa237 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef FS_NAMENODE_PROTOCOL_H_ +#define FS_NAMENODE_PROTOCOL_H_ + +#include "ClientNamenodeProtocol.pb.h" +#include "rpc/rpc_engine.h" + +namespace hdfs { + +class ClientNamenodeProtocol { + public: + ClientNamenodeProtocol(RpcEngine *engine) + : engine_(engine) + {} + + Status GetBlockLocations(const ::hadoop::hdfs::GetBlockLocationsRequestProto *request, + std::shared_ptr<::hadoop::hdfs::GetBlockLocationsResponseProto> response) { + return engine_->Rpc("getBlockLocations", request, response); + } + private: + RpcEngine *engine_; +}; + +}; + +#endif