Repository: hadoop Updated Branches: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1 [created] 08e12b0cb
HDFS-8758. Implement the continuation library in libhdfspp. Contributed by Haohui Mai. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8b02d962 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8b02d962 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8b02d962 Branch: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1 Commit: 8b02d962b291afe4b08c47f0398c1db0709419a1 Parents: ac60c6a Author: Haohui Mai <whe...@apache.org> Authored: Thu Jul 9 14:02:55 2015 -0700 Committer: Haohui Mai <whe...@apache.org> Committed: Fri Jul 10 17:09:01 2015 -0700 ---------------------------------------------------------------------- .../src/main/native/libhdfspp/doc/Doxyfile.in | 1 + .../libhdfspp/lib/common/continuation/asio.h | 112 ++++++++++++++++ .../lib/common/continuation/continuation.h | 125 ++++++++++++++++++ .../lib/common/continuation/protobuf.h | 128 +++++++++++++++++++ 4 files changed, 366 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in index 773990f..ac1d0fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in @@ -8,6 +8,7 @@ BUILTIN_STL_SUPPORT = YES INPUT = @PROJECT_SOURCE_DIR@/doc/mainpage.dox \ @PROJECT_SOURCE_DIR@/include/libhdfspp \ + @PROJECT_SOURCE_DIR@/lib/common/continuation \ INPUT_ENCODING = UTF-8 RECURSIVE = NO http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h new file mode 100644 index 0000000..f7d76e8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_CONTINUATION_ASIO_H_ +#define LIB_COMMON_CONTINUATION_ASIO_H_ + +#include "continuation.h" +#include "common/util.h" + +#include "libhdfspp/status.h" + +#include <asio/connect.hpp> +#include <asio/read.hpp> +#include <asio/write.hpp> +#include <asio/ip/tcp.hpp> + +namespace hdfs { +namespace continuation { + +template <class Stream, class MutableBufferSequence> +class ReadContinuation : public Continuation { +public: + ReadContinuation(Stream *stream, const MutableBufferSequence &buffer) + : stream_(stream), buffer_(buffer) {} + virtual void Run(const Next &next) override { + auto handler = + [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); }; + asio::async_read(*stream_, buffer_, handler); + } + +private: + Stream *stream_; + MutableBufferSequence buffer_; +}; + +template <class Stream, class ConstBufferSequence> +class WriteContinuation : public Continuation { +public: + WriteContinuation(Stream *stream, const ConstBufferSequence &buffer) + : stream_(stream), buffer_(buffer) {} + + virtual void Run(const Next &next) override { + auto handler = + [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); }; + asio::async_write(*stream_, buffer_, handler); + } + +private: + Stream *stream_; + ConstBufferSequence buffer_; +}; + +template <class Socket, class Iterator> +class ConnectContinuation : public Continuation { +public: + ConnectContinuation(Socket *socket, Iterator begin, Iterator end, + Iterator *connected_endpoint) + : socket_(socket), begin_(begin), end_(end), + connected_endpoint_(connected_endpoint) {} + + virtual void Run(const Next &next) override { + auto handler = [this, next](const asio::error_code &ec, Iterator it) { + if (connected_endpoint_) { + *connected_endpoint_ = it; + } + next(ToStatus(ec)); + }; + asio::async_connect(*socket_, begin_, end_, handler); + } + +private: + Socket *socket_; + Iterator begin_; + Iterator end_; + Iterator *connected_endpoint_; +}; + +template <class Stream, class ConstBufferSequence> +static inline Continuation *Write(Stream *stream, + const ConstBufferSequence &buffer) { + return new WriteContinuation<Stream, ConstBufferSequence>(stream, buffer); +} + +template <class Stream, class MutableBufferSequence> +static inline Continuation *Read(Stream *stream, + const MutableBufferSequence &buffer) { + return new ReadContinuation<Stream, MutableBufferSequence>(stream, buffer); +} + +template <class Socket, class Iterator> +static inline Continuation *Connect(Socket *socket, Iterator begin, + Iterator end) { + return new ConnectContinuation<Socket, Iterator>(socket, begin, end, nullptr); +} +} +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h new file mode 100644 index 0000000..9576c2f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_CONTINUATION_CONTINUATION_H_ +#define LIB_COMMON_CONTINUATION_CONTINUATION_H_ + +#include "libhdfspp/status.h" + +#include <functional> +#include <memory> +#include <vector> + +namespace hdfs { +namespace continuation { + +class PipelineBase; + +/** + * A continuation is a fragment of runnable code whose execution will + * be scheduled by a \link Pipeline \endlink. + * + * The Continuation class is a build block to implement the + * Continuation Passing Style (CPS) in libhdfs++. In CPS, the + * upper-level user specifies the control flow by chaining a sequence + * of continuations explicitly through the \link Run() \endlink method, + * while in traditional imperative programming the sequences of + * sentences implicitly specify the control flow. + * + * See http://en.wikipedia.org/wiki/Continuation for more details. + **/ +class Continuation { +public: + typedef std::function<void(const Status &)> Next; + virtual ~Continuation() = default; + virtual void Run(const Next &next) = 0; + Continuation(const Continuation &) = delete; + Continuation &operator=(const Continuation &) = delete; + +protected: + Continuation() = default; +}; + +/** + * A pipeline schedules the execution of a chain of \link Continuation + * \endlink. The pipeline schedules the execution of continuations + * based on their order in the pipeline, where the next parameter for + * each continuation points to the \link Schedule() \endlink + * method. That way the pipeline executes all scheduled continuations + * in sequence. + * + * The typical use case of a pipeline is executing continuations + * asynchronously. Note that a continuation calls the next + * continuation when it is finished. If the continuation is posted + * into an asynchronous event loop, invoking the next continuation + * can be done in the callback handler in the asynchronous event loop. + * + * The pipeline allocates the memory as follows. A pipeline is always + * allocated on the heap. It owns all the continuations as well as the + * the state specified by the user. Both the continuations and the + * state have the same life cycle of the pipeline. The design + * simplifies the problem of ensuring that the executions in the + * asynchronous event loop always hold valid pointers w.r.t. the + * pipeline. The pipeline will automatically deallocate itself right + * after it invokes the callback specified the user. + **/ +template <class State> class Pipeline { +public: + typedef std::function<void(const Status &, const State &)> UserHandler; + static Pipeline *Create() { return new Pipeline(); } + Pipeline &Push(Continuation *stage); + void Run(UserHandler &&handler); + State &state() { return state_; } + +private: + State state_; + std::vector<std::unique_ptr<Continuation>> routines_; + size_t stage_; + std::function<void(const Status &, const State &)> handler_; + + Pipeline() : stage_(0) {} + ~Pipeline() = default; + void Schedule(const Status &status); +}; + +template <class State> +inline Pipeline<State> &Pipeline<State>::Push(Continuation *stage) { + routines_.emplace_back(std::unique_ptr<Continuation>(stage)); + return *this; +} + +template <class State> +inline void Pipeline<State>::Schedule(const Status &status) { + if (stage_ >= routines_.size()) { + handler_(status, state_); + routines_.clear(); + delete this; + } else { + auto next = routines_[stage_].get(); + ++stage_; + next->Run(std::bind(&Pipeline::Schedule, this, std::placeholders::_1)); + } +} + +template <class State> inline void Pipeline<State>::Run(UserHandler &&handler) { + handler_ = std::move(handler); + Schedule(Status::OK()); +} +} +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h new file mode 100644 index 0000000..3e4b535 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_ +#define LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_ + +#include "common/util.h" + +#include <google/protobuf/message_lite.h> +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> + +#include <cassert> + +namespace hdfs { +namespace continuation { + +template <class Stream, size_t MaxMessageSize = 512> +struct ReadDelimitedPBMessageContinuation : public Continuation { + ReadDelimitedPBMessageContinuation(Stream *stream, + ::google::protobuf::MessageLite *msg) + : stream_(stream), msg_(msg) {} + + virtual void Run(const Next &next) override { + namespace pbio = google::protobuf::io; + auto handler = [this, next](const asio::error_code &ec, size_t) { + Status status; + if (ec) { + status = ToStatus(ec); + } else { + pbio::ArrayInputStream as(&buf_[0], buf_.size()); + pbio::CodedInputStream is(&as); + uint32_t size = 0; + bool v = is.ReadVarint32(&size); + assert(v); + is.PushLimit(size); + msg_->Clear(); + v = msg_->MergeFromCodedStream(&is); + assert(v); + } + next(status); + }; + asio::async_read( + *stream_, asio::buffer(buf_), + std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this, + std::placeholders::_1, std::placeholders::_2), + handler); + } + +private: + size_t CompletionHandler(const asio::error_code &ec, size_t transferred) { + if (ec) { + return 0; + } + + size_t offset = 0, len = 0; + for (size_t i = 0; i + 1 < transferred && i < sizeof(int); ++i) { + len = (len << 7) | (buf_[i] & 0x7f); + if ((uint8_t)buf_.at(i) < 0x80) { + offset = i + 1; + break; + } + } + + assert(offset + len < buf_.size() && "Message is too big"); + return offset ? len + offset - transferred : 1; + } + + Stream *stream_; + ::google::protobuf::MessageLite *msg_; + std::array<char, MaxMessageSize> buf_; +}; + +template <class Stream> +struct WriteDelimitedPBMessageContinuation : Continuation { + WriteDelimitedPBMessageContinuation(Stream *stream, + const google::protobuf::MessageLite *msg) + : stream_(stream), msg_(msg) {} + + virtual void Run(const Next &next) override { + namespace pbio = google::protobuf::io; + int size = msg_->ByteSize(); + buf_.reserve(pbio::CodedOutputStream::VarintSize32(size) + size); + pbio::StringOutputStream ss(&buf_); + pbio::CodedOutputStream os(&ss); + os.WriteVarint32(size); + msg_->SerializeToCodedStream(&os); + write_coroutine_ = + std::shared_ptr<Continuation>(Write(stream_, asio::buffer(buf_))); + write_coroutine_->Run([next](const Status &stat) { next(stat); }); + } + +private: + Stream *stream_; + const google::protobuf::MessageLite *msg_; + std::string buf_; + std::shared_ptr<Continuation> write_coroutine_; +}; + +template <class Stream, size_t MaxMessageSize = 512> +static inline Continuation * +ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) { + return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream, + msg); +} + +template <class Stream> +static inline Continuation * +WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) { + return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg); +} +} +} +#endif