http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/CMakeLists.txt index e3e6152..34b842f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/CMakeLists.txt @@ -84,7 +84,7 @@ endfunction() add_subdirectory(main/native/libhdfs) add_subdirectory(main/native/libhdfs-tests) - +add_subdirectory(main/native/libhdfspp) if(REQUIRE_LIBWEBHDFS) add_subdirectory(contrib/libwebhdfs)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt new file mode 100644 index 0000000..17612cf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project (libhdfspp) + +enable_testing() + +find_package(Doxygen) +find_package(OpenSSL REQUIRED) +find_package(Protobuf REQUIRED) +find_package(Threads) + +add_definitions(-DASIO_STANDALONE -DASIO_CPP11_DATE_TIME) + +if(UNIX) +set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -std=c++11 -g -fPIC -fno-strict-aliasing") +endif() + +# Mac OS 10.7 and later deprecates most of the methods in OpenSSL. +# Add -Wno-deprecated-declarations to avoid the warnings. +if(APPLE) +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++ -Wno-deprecated-declarations -Wno-unused-local-typedef") +endif() + +if(DOXYGEN_FOUND) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/doc/Doxyfile.in ${CMAKE_CURRENT_BINARY_DIR}/doc/Doxyfile @ONLY) +add_custom_target(doc ${DOXYGEN_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/doc/Doxyfile + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR} + COMMENT "Generating API documentation with Doxygen" VERBATIM) +endif(DOXYGEN_FOUND) + +include_directories( + include + lib + ${PROJECT_BINARY_DIR}/lib/proto + third_party/asio-1.10.2/include + third_party/gmock-1.7.0 + ${OPENSSL_INCLUDE_DIR} +) + +set(PROTO_HDFS_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../../hadoop-hdfs-client/src/main/proto) +set(PROTO_HADOOP_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../../../hadoop-common-project/hadoop-common/src/main/proto) +set(PROTO_HADOOP_TEST_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../../../hadoop-common-project/hadoop-common/src/test/proto) + +add_subdirectory(third_party/gmock-1.7.0) +add_subdirectory(lib) +add_subdirectory(tests) http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/doc/Doxyfile.in ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/doc/Doxyfile.in b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/doc/Doxyfile.in new file mode 100644 index 0000000..ac1d0fb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/doc/Doxyfile.in @@ -0,0 +1,17 @@ +DOXYFILE_ENCODING = UTF-8 +PROJECT_NAME = "libhdfspp" +OUTPUT_DIRECTORY = doc +TAB_SIZE = 2 +MARKDOWN_SUPPORT = YES +BUILTIN_STL_SUPPORT = YES + + +INPUT = @PROJECT_SOURCE_DIR@/doc/mainpage.dox \ + @PROJECT_SOURCE_DIR@/include/libhdfspp \ + @PROJECT_SOURCE_DIR@/lib/common/continuation \ + +INPUT_ENCODING = UTF-8 +RECURSIVE = NO + +GENERATE_HTML = YES +GENERATE_LATEX = NO http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/doc/mainpage.dox ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/doc/mainpage.dox b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/doc/mainpage.dox new file mode 100644 index 0000000..ef4ba26 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/doc/mainpage.dox @@ -0,0 +1,8 @@ +/** +\mainpage libhdfs++ + +libhdfs++ is a modern implementation of HDFS client in C++11. It is +optimized for the Massive Parallel Processing (MPP) applications that +access thousands of files concurrently in HDFS. + +*/ http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h new file mode 100644 index 0000000..a3b1853 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_HDFS_H_ +#define LIBHDFSPP_HDFS_H_ + +#include "libhdfspp/options.h" +#include "libhdfspp/status.h" + +#include <functional> +#include <set> + +namespace hdfs { + +/** + * An IoService manages a queue of asynchronous tasks. All libhdfs++ + * operations are filed against a particular IoService. + * + * When an operation is queued into an IoService, the IoService will + * run the callback handler associated with the operation. Note that + * the IoService must be stopped before destructing the objects that + * file the operations. + * + * From an implementation point of view the IoService object wraps the + * ::asio::io_service objects. Please see the related documentation + * for more details. + **/ +class IoService { +public: + static IoService *New(); + /** + * Run the asynchronous tasks associated with this IoService. + **/ + virtual void Run() = 0; + /** + * Stop running asynchronous tasks associated with this IoService. + **/ + virtual void Stop() = 0; + virtual ~IoService(); +}; + +/** + * Applications opens an InputStream to read files in HDFS. + **/ +class InputStream { +public: + /** + * Read data from a specific position. The current implementation + * stops at the block boundary. + * + * @param buf the pointer to the buffer + * @param nbyte the size of the buffer + * @param offset the offset the file + * @param excluded_datanodes the UUID of the datanodes that should + * not be used in this read + * + * The handler returns the datanode that serves the block and the number of + * bytes has read. + **/ + virtual void + PositionRead(void *buf, size_t nbyte, uint64_t offset, + const std::set<std::string> &excluded_datanodes, + const std::function<void(const Status &, const std::string &, + size_t)> &handler) = 0; + virtual ~InputStream(); +}; + +/** + * FileSystem implements APIs to interact with HDFS. + **/ +class FileSystem { +public: + /** + * Create a new instance of the FileSystem object. The call + * initializes the RPC connections to the NameNode and returns an + * FileSystem object. + **/ + static void + New(IoService *io_service, const Options &options, const std::string &server, + const std::string &service, + const std::function<void(const Status &, FileSystem *)> &handler); + /** + * Open a file on HDFS. The call issues an RPC to the NameNode to + * gather the locations of all blocks in the file and to return a + * new instance of the @ref InputStream object. + **/ + virtual void + Open(const std::string &path, + const std::function<void(const Status &, InputStream *)> &handler) = 0; + virtual ~FileSystem(); +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h new file mode 100644 index 0000000..c39d04e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_OPTIONS_H_ +#define LIBHDFSPP_OPTIONS_H_ + +namespace hdfs { + +/** + * Options to control the behavior of the libhdfspp library. + **/ +struct Options { + /** + * Time out of RPC requests in milliseconds. + * Default: 30000 + **/ + int rpc_timeout; + Options(); +}; +} +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/status.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/status.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/status.h new file mode 100644 index 0000000..fc5ea66 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/status.h @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_STATUS_H_ +#define LIBHDFSPP_STATUS_H_ + +#include <string> +#include <system_error> + +namespace hdfs { + +class StatusHelper; +class Status { + public: + // Create a success status. + Status() : state_(NULL) { } + ~Status() { delete[] state_; } + explicit Status(int code, const char *msg); + + // Copy the specified status. + Status(const Status& s); + void operator=(const Status& s); + + // Return a success status. + static Status OK() { return Status(); } + static Status InvalidArgument(const char *msg) + { return Status(kInvalidArgument, msg); } + static Status ResourceUnavailable(const char *msg) + { return Status(kResourceUnavailable, msg); } + static Status Unimplemented() + { return Status(kUnimplemented, ""); } + static Status Exception(const char *expception_class_name, const char *error_message) + { return Status(kException, expception_class_name, error_message); } + static Status Error(const char *error_message) + { return Exception("Exception", error_message); } + + // Returns true iff the status indicates success. + bool ok() const { return (state_ == NULL); } + + // Return a string representation of this status suitable for printing. + // Returns the string "OK" for success. + std::string ToString() const; + + int code() const { + return (state_ == NULL) ? kOk : static_cast<int>(state_[4]); + } + + private: + // OK status has a NULL state_. Otherwise, state_ is a new[] array + // of the following form: + // state_[0..3] == length of message + // state_[4] == code + // state_[5..] == message + const char* state_; + + enum Code { + kOk = 0, + kInvalidArgument = static_cast<unsigned>(std::errc::invalid_argument), + kResourceUnavailable = static_cast<unsigned>(std::errc::resource_unavailable_try_again), + kUnimplemented = static_cast<unsigned>(std::errc::function_not_supported), + kException = 255, + }; + + explicit Status(int code, const char *msg1, const char *msg2); + static const char *CopyState(const char* s); + static const char *ConstructState(int code, const char *msg1, const char *msg2); +}; + +inline Status::Status(const Status& s) { + state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_); +} + +inline void Status::operator=(const Status& s) { + // The following condition catches both aliasing (when this == &s), + // and the common case where both s and *this are ok. + if (state_ != s.state_) { + delete[] state_; + state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_); + } +} + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt new file mode 100644 index 0000000..a0e3379 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +add_subdirectory(common) +add_subdirectory(fs) +add_subdirectory(reader) +add_subdirectory(rpc) +add_subdirectory(proto) http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt new file mode 100644 index 0000000..b03f00b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt @@ -0,0 +1 @@ +add_library(common base64.cc options.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc) http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/base64.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/base64.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/base64.cc new file mode 100644 index 0000000..f98fec5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/base64.cc @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "util.h" + +#include <array> +#include <functional> +#include <algorithm> + +namespace hdfs { + +std::string Base64Encode(const std::string &src) { + static const char kDictionary[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789+/"; + + int encoded_size = (src.size() + 2) / 3 * 4; + std::string dst; + dst.reserve(encoded_size); + + size_t i = 0; + while (i + 3 < src.length()) { + const char *s = &src[i]; + const int r[4] = {s[0] >> 2, ((s[0] << 4) | (s[1] >> 4)) & 0x3f, + ((s[1] << 2) | (s[2] >> 6)) & 0x3f, s[2] & 0x3f}; + + std::transform(r, r + sizeof(r) / sizeof(int), std::back_inserter(dst), + [&r](unsigned char v) { return kDictionary[v]; }); + i += 3; + } + + size_t remained = src.length() - i; + const char *s = &src[i]; + + switch (remained) { + case 0: + break; + case 1: { + char padding[4] = {kDictionary[s[0] >> 2], kDictionary[(s[0] << 4) & 0x3f], + '=', '='}; + dst.append(padding, sizeof(padding)); + } break; + case 2: { + char padding[4] = {kDictionary[src[i] >> 2], + kDictionary[((s[0] << 4) | (s[1] >> 4)) & 0x3f], + kDictionary[(s[1] << 2) & 0x3f], '='}; + dst.append(padding, sizeof(padding)); + } break; + default: + assert("Unreachable"); + break; + } + return dst; +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h new file mode 100644 index 0000000..5630934 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_CONTINUATION_ASIO_H_ +#define LIB_COMMON_CONTINUATION_ASIO_H_ + +#include "continuation.h" +#include "common/util.h" + +#include "libhdfspp/status.h" + +#include <asio/connect.hpp> +#include <asio/read.hpp> +#include <asio/write.hpp> +#include <asio/ip/tcp.hpp> + +namespace hdfs { +namespace continuation { + +template <class Stream, class MutableBufferSequence> +class ReadContinuation : public Continuation { +public: + ReadContinuation(Stream *stream, const MutableBufferSequence &buffer) + : stream_(stream), buffer_(buffer) {} + virtual void Run(const Next &next) override { + auto handler = + [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); }; + asio::async_read(*stream_, buffer_, handler); + } + +private: + Stream *stream_; + MutableBufferSequence buffer_; +}; + +template <class Stream, class ConstBufferSequence> +class WriteContinuation : public Continuation { +public: + WriteContinuation(Stream *stream, const ConstBufferSequence &buffer) + : stream_(stream), buffer_(buffer) {} + + virtual void Run(const Next &next) override { + auto handler = + [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); }; + asio::async_write(*stream_, buffer_, handler); + } + +private: + Stream *stream_; + ConstBufferSequence buffer_; +}; + +template <class Socket, class Iterator> +class ConnectContinuation : public Continuation { +public: + ConnectContinuation(Socket *socket, Iterator begin, Iterator end, + Iterator *connected_endpoint) + : socket_(socket), begin_(begin), end_(end), + connected_endpoint_(connected_endpoint) {} + + virtual void Run(const Next &next) override { + auto handler = [this, next](const asio::error_code &ec, Iterator it) { + if (connected_endpoint_) { + *connected_endpoint_ = it; + } + next(ToStatus(ec)); + }; + asio::async_connect(*socket_, begin_, end_, handler); + } + +private: + Socket *socket_; + Iterator begin_; + Iterator end_; + Iterator *connected_endpoint_; +}; + +template <class OutputIterator> +class ResolveContinuation : public Continuation { +public: + ResolveContinuation(::asio::io_service *io_service, const std::string &server, + const std::string &service, OutputIterator result) + : resolver_(*io_service), query_(server, service), result_(result) {} + + virtual void Run(const Next &next) override { + using resolver = ::asio::ip::tcp::resolver; + auto handler = + [this, next](const asio::error_code &ec, resolver::iterator it) { + if (!ec) { + std::copy(it, resolver::iterator(), result_); + } + next(ToStatus(ec)); + }; + resolver_.async_resolve(query_, handler); + } + +private: + ::asio::ip::tcp::resolver resolver_; + ::asio::ip::tcp::resolver::query query_; + OutputIterator result_; +}; + +template <class Stream, class ConstBufferSequence> +static inline Continuation *Write(Stream *stream, + const ConstBufferSequence &buffer) { + return new WriteContinuation<Stream, ConstBufferSequence>(stream, buffer); +} + +template <class Stream, class MutableBufferSequence> +static inline Continuation *Read(Stream *stream, + const MutableBufferSequence &buffer) { + return new ReadContinuation<Stream, MutableBufferSequence>(stream, buffer); +} + +template <class Socket, class Iterator> +static inline Continuation *Connect(Socket *socket, Iterator begin, + Iterator end) { + return new ConnectContinuation<Socket, Iterator>(socket, begin, end, nullptr); +} + +template <class OutputIterator> +static inline Continuation * +Resolve(::asio::io_service *io_service, const std::string &server, + const std::string &service, OutputIterator result) { + return new ResolveContinuation<OutputIterator>(io_service, server, service, result); +} +} +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h new file mode 100644 index 0000000..0af04a8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_CONTINUATION_CONTINUATION_H_ +#define LIB_COMMON_CONTINUATION_CONTINUATION_H_ + +#include "libhdfspp/status.h" + +#include <functional> +#include <memory> +#include <vector> + +namespace hdfs { +namespace continuation { + +class PipelineBase; + +/** + * A continuation is a fragment of runnable code whose execution will + * be scheduled by a \link Pipeline \endlink. + * + * The Continuation class is a build block to implement the + * Continuation Passing Style (CPS) in libhdfs++. In CPS, the + * upper-level user specifies the control flow by chaining a sequence + * of continuations explicitly through the \link Run() \endlink method, + * while in traditional imperative programming the sequences of + * sentences implicitly specify the control flow. + * + * See http://en.wikipedia.org/wiki/Continuation for more details. + **/ +class Continuation { +public: + typedef std::function<void(const Status &)> Next; + virtual ~Continuation() = default; + virtual void Run(const Next &next) = 0; + Continuation(const Continuation &) = delete; + Continuation &operator=(const Continuation &) = delete; + +protected: + Continuation() = default; +}; + +/** + * A pipeline schedules the execution of a chain of \link Continuation + * \endlink. The pipeline schedules the execution of continuations + * based on their order in the pipeline, where the next parameter for + * each continuation points to the \link Schedule() \endlink + * method. That way the pipeline executes all scheduled continuations + * in sequence. + * + * The typical use case of a pipeline is executing continuations + * asynchronously. Note that a continuation calls the next + * continuation when it is finished. If the continuation is posted + * into an asynchronous event loop, invoking the next continuation + * can be done in the callback handler in the asynchronous event loop. + * + * The pipeline allocates the memory as follows. A pipeline is always + * allocated on the heap. It owns all the continuations as well as the + * the state specified by the user. Both the continuations and the + * state have the same life cycle of the pipeline. The design + * simplifies the problem of ensuring that the executions in the + * asynchronous event loop always hold valid pointers w.r.t. the + * pipeline. The pipeline will automatically deallocate itself right + * after it invokes the callback specified the user. + **/ +template <class State> class Pipeline { +public: + typedef std::function<void(const Status &, const State &)> UserHandler; + static Pipeline *Create() { return new Pipeline(); } + Pipeline &Push(Continuation *stage); + void Run(UserHandler &&handler); + State &state() { return state_; } + +private: + State state_; + std::vector<std::unique_ptr<Continuation>> routines_; + size_t stage_; + std::function<void(const Status &, const State &)> handler_; + + Pipeline() : stage_(0) {} + ~Pipeline() = default; + void Schedule(const Status &status); +}; + +template <class State> +inline Pipeline<State> &Pipeline<State>::Push(Continuation *stage) { + routines_.emplace_back(std::unique_ptr<Continuation>(stage)); + return *this; +} + +template <class State> +inline void Pipeline<State>::Schedule(const Status &status) { + if (!status.ok() || stage_ >= routines_.size()) { + handler_(status, state_); + routines_.clear(); + delete this; + } else { + auto next = routines_[stage_].get(); + ++stage_; + next->Run(std::bind(&Pipeline::Schedule, this, std::placeholders::_1)); + } +} + +template <class State> inline void Pipeline<State>::Run(UserHandler &&handler) { + handler_ = std::move(handler); + Schedule(Status::OK()); +} + +template <class Handler> class BindContinuation : public Continuation { +public: + BindContinuation(const Handler &handler) : handler_(handler) {} + virtual void Run(const Next &next) override { handler_(next); } + +private: + Handler handler_; +}; + +template <class Handler> static inline Continuation *Bind(const Handler &handler) { + return new BindContinuation<Handler>(handler); +} +} +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h new file mode 100644 index 0000000..d30322c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_ +#define LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_ + +#include "common/util.h" + +#include <asio/read.hpp> + +#include <google/protobuf/message_lite.h> +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> + +#include <cassert> + +namespace hdfs { +namespace continuation { + +template <class Stream, size_t MaxMessageSize = 512> +struct ReadDelimitedPBMessageContinuation : public Continuation { + ReadDelimitedPBMessageContinuation(Stream *stream, + ::google::protobuf::MessageLite *msg) + : stream_(stream), msg_(msg) {} + + virtual void Run(const Next &next) override { + namespace pbio = google::protobuf::io; + auto handler = [this, next](const asio::error_code &ec, size_t) { + Status status; + if (ec) { + status = ToStatus(ec); + } else { + pbio::ArrayInputStream as(&buf_[0], buf_.size()); + pbio::CodedInputStream is(&as); + uint32_t size = 0; + bool v = is.ReadVarint32(&size); + assert(v); + is.PushLimit(size); + msg_->Clear(); + v = msg_->MergeFromCodedStream(&is); + assert(v); + } + next(status); + }; + asio::async_read( + *stream_, asio::buffer(buf_), + std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this, + std::placeholders::_1, std::placeholders::_2), + handler); + } + +private: + size_t CompletionHandler(const asio::error_code &ec, size_t transferred) { + if (ec) { + return 0; + } + + size_t offset = 0, len = 0; + for (size_t i = 0; i + 1 < transferred && i < sizeof(int); ++i) { + len = (len << 7) | (buf_[i] & 0x7f); + if ((uint8_t)buf_.at(i) < 0x80) { + offset = i + 1; + break; + } + } + + assert(offset + len < buf_.size() && "Message is too big"); + return offset ? len + offset - transferred : 1; + } + + Stream *stream_; + ::google::protobuf::MessageLite *msg_; + std::array<char, MaxMessageSize> buf_; +}; + +template <class Stream> +struct WriteDelimitedPBMessageContinuation : Continuation { + WriteDelimitedPBMessageContinuation(Stream *stream, + const google::protobuf::MessageLite *msg) + : stream_(stream), msg_(msg) {} + + virtual void Run(const Next &next) override { + namespace pbio = google::protobuf::io; + int size = msg_->ByteSize(); + buf_.reserve(pbio::CodedOutputStream::VarintSize32(size) + size); + pbio::StringOutputStream ss(&buf_); + pbio::CodedOutputStream os(&ss); + os.WriteVarint32(size); + msg_->SerializeToCodedStream(&os); + write_coroutine_ = + std::shared_ptr<Continuation>(Write(stream_, asio::buffer(buf_))); + write_coroutine_->Run([next](const Status &stat) { next(stat); }); + } + +private: + Stream *stream_; + const google::protobuf::MessageLite *msg_; + std::string buf_; + std::shared_ptr<Continuation> write_coroutine_; +}; + +template <class Stream, size_t MaxMessageSize = 512> +static inline Continuation * +ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) { + return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream, + msg); +} + +template <class Stream> +static inline Continuation * +WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) { + return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg); +} +} +} +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc new file mode 100644 index 0000000..3192614 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfs_public_api.h" + +namespace hdfs { + +IoService::~IoService() {} + +IoService *IoService::New() { + return new IoServiceImpl(); +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h new file mode 100644 index 0000000..95567c0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMON_HDFS_PUBLIC_API_H_ +#define COMMON_HDFS_PUBLIC_API_H_ + +#include "libhdfspp/hdfs.h" + +#include <asio/io_service.hpp> + +namespace hdfs { + +class IoServiceImpl : public IoService { + public: + virtual void Run() override { + asio::io_service::work work(io_service_); + io_service_.run(); + } + virtual void Stop() override { io_service_.stop(); } + ::asio::io_service &io_service() { return io_service_; } + private: + ::asio::io_service io_service_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h new file mode 100644 index 0000000..82bdae0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIB_COMMON_LOGGING_H_ +#define LIB_COMMON_LOGGING_H_ + +#include <iostream> + +namespace hdfs { + +enum LogLevel { + kDebug, + kInfo, + kWarning, + kError, +}; + +#define LOG_DEBUG() LogMessage(kDebug) +#define LOG_INFO() LogMessage(kInfo) +#define LOG_WARN() LogMessage(kWarning) +#define LOG_ERROR() LogMessage(kError) + +class LogMessage { + public: + LogMessage(const LogLevel &l) { + static constexpr const char * kLogLevelMessage[] = {"DEBUG", "INFO", "WARN", "ERROR"}; + ::std::cerr << "[" << kLogLevelMessage[(size_t)l] << "] "; + } + + ~LogMessage() { + ::std::cerr << std::endl; + } + + LogMessage& operator<<(const std::string& msg) { + ::std::cerr << msg; + return *this; + } + LogMessage& operator<<(int x) { + ::std::cerr << x; + return *this; + } +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc new file mode 100644 index 0000000..529fd0b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "libhdfspp/options.h" + +namespace hdfs { + +Options::Options() + : rpc_timeout(30000) +{} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h new file mode 100644 index 0000000..71fee7a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_SASL_AUTHENTICATOR_H_ +#define LIB_COMMON_SASL_AUTHENTICATOR_H_ + +#include "libhdfspp/status.h" + +namespace hdfs { + +class DigestMD5AuthenticatorTest_TestResponse_Test; + +/** + * A specialized implementation of RFC 2831 for the HDFS + * DataTransferProtocol. + * + * The current lacks the following features: + * * Encoding the username, realm, and password in ISO-8859-1 when + * it is required by the RFC. They are always encoded in UTF-8. + * * Checking whether the challenges from the server are + * well-formed. + * * Specifying authzid, digest-uri and maximum buffer size. + * * Supporting QOP other than the auth level. + **/ +class DigestMD5Authenticator { +public: + Status EvaluateResponse(const std::string &payload, std::string *result); + DigestMD5Authenticator(const std::string &username, + const std::string &password, bool mock_nonce = false); + +private: + Status GenerateFirstResponse(std::string *result); + Status GenerateResponseValue(std::string *response_value); + Status ParseFirstChallenge(const std::string &payload); + + static size_t NextToken(const std::string &payload, size_t off, + std::string *tok); + void GenerateCNonce(); + std::string username_; + std::string password_; + std::string nonce_; + std::string cnonce_; + std::string realm_; + std::string qop_; + unsigned nonce_count_; + + const bool TEST_mock_cnonce_; + friend class DigestMD5AuthenticatorTest_TestResponse_Test; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc new file mode 100644 index 0000000..3ca8578 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "sasl_authenticator.h" + +#include "common/util.h" + +#include <openssl/rand.h> +#include <openssl/md5.h> + +#include <iomanip> +#include <map> +#include <sstream> + +namespace hdfs { + +static std::string QuoteString(const std::string &src); +static std::string GetMD5Digest(const std::string &src); +static std::string BinaryToHex(const std::string &src); + +static const char kDigestUri[] = "hdfs/0"; +static const size_t kMaxBufferSize = 65536; + +DigestMD5Authenticator::DigestMD5Authenticator(const std::string &username, + const std::string &password, + bool mock_nonce) + : username_(username), password_(password), nonce_count_(0), + TEST_mock_cnonce_(mock_nonce) {} + +Status DigestMD5Authenticator::EvaluateResponse(const std::string &payload, + std::string *result) { + Status status = ParseFirstChallenge(payload); + if (status.ok()) { + status = GenerateFirstResponse(result); + } + return status; +} + +size_t DigestMD5Authenticator::NextToken(const std::string &payload, size_t off, + std::string *tok) { + tok->clear(); + if (off >= payload.size()) { + return std::string::npos; + } + + char c = payload[off]; + if (c == '=' || c == ',') { + *tok = c; + return off + 1; + } + + int quote_count = 0; + for (; off < payload.size(); ++off) { + char c = payload[off]; + if (c == '"') { + ++quote_count; + if (quote_count == 2) { + return off + 1; + } + continue; + } + + if (c == '=') { + if (quote_count) { + tok->append(&c, 1); + } else { + break; + } + } else if (('0' <= c && c <= '9') || ('a' <= c && c <= 'z') || + ('A' <= c && c <= 'Z') || c == '+' || c == '/' || c == '-' || + c == '_' || c == '@') { + tok->append(&c, 1); + } else { + break; + } + } + return off; +} + +void DigestMD5Authenticator::GenerateCNonce() { + if (!TEST_mock_cnonce_) { + char buf[8] = {0,}; + RAND_pseudo_bytes(reinterpret_cast<unsigned char *>(buf), sizeof(buf)); + cnonce_ = Base64Encode(std::string(buf, sizeof(buf))); + } +} + +Status DigestMD5Authenticator::ParseFirstChallenge(const std::string &payload) { + std::map<std::string, std::string> props; + std::string token; + enum { + kStateLVal, + kStateEqual, + kStateRVal, + kStateCommaOrEnd, + }; + + int state = kStateLVal; + + std::string lval, rval; + size_t off = 0; + while (true) { + off = NextToken(payload, off, &token); + if (off == std::string::npos) { + break; + } + + switch (state) { + case kStateLVal: + lval = token; + state = kStateEqual; + break; + case kStateEqual: + state = kStateRVal; + break; + case kStateRVal: + rval = token; + props[lval] = rval; + state = kStateCommaOrEnd; + break; + case kStateCommaOrEnd: + state = kStateLVal; + break; + } + } + + if (props["algorithm"] != "md5-sess" || props["charset"] != "utf-8" || + props.find("nonce") == props.end()) { + return Status::Error("Invalid challenge"); + } + realm_ = props["realm"]; + nonce_ = props["nonce"]; + qop_ = props["qop"]; + return Status::OK(); +} + +Status DigestMD5Authenticator::GenerateFirstResponse(std::string *result) { + // TODO: Support auth-int and auth-conf + // Handle cipher + if (qop_ != "auth") { + return Status::Unimplemented(); + } + + std::stringstream ss; + GenerateCNonce(); + ss << "charset=utf-8,username=\"" << QuoteString(username_) << "\"" + << ",authzid=\"" << QuoteString(username_) << "\"" + << ",nonce=\"" << QuoteString(nonce_) << "\"" + << ",digest-uri=\"" << kDigestUri << "\"" + << ",maxbuf=" << kMaxBufferSize << ",cnonce=\"" << cnonce_ << "\""; + + if (realm_.size()) { + ss << ",realm=\"" << QuoteString(realm_) << "\""; + } + + ss << ",nc=" << std::hex << std::setw(8) << std::setfill('0') + << ++nonce_count_; + std::string response_value; + GenerateResponseValue(&response_value); + ss << ",response=" << response_value; + *result = ss.str(); + return result->size() > 4096 ? Status::Error("Response too big") + : Status::OK(); +} + +/** + * Generate the response value specified in S 2.1.2.1 in RFC2831. + **/ +Status +DigestMD5Authenticator::GenerateResponseValue(std::string *response_value) { + std::stringstream begin_a1, a1_ss; + std::string a1, a2; + + if (qop_ == "auth") { + a2 = std::string("AUTHENTICATE:") + kDigestUri; + } else { + a2 = std::string("AUTHENTICATE:") + kDigestUri + + ":00000000000000000000000000000000"; + } + + begin_a1 << username_ << ":" << realm_ << ":" << password_; + a1_ss << GetMD5Digest(begin_a1.str()) << ":" << nonce_ << ":" << cnonce_ + << ":" << username_; + + std::stringstream combine_ss; + combine_ss << BinaryToHex(GetMD5Digest(a1_ss.str())) << ":" << nonce_ << ":" + << std::hex << std::setw(8) << std::setfill('0') << nonce_count_ + << ":" << cnonce_ << ":" << qop_ << ":" + << BinaryToHex(GetMD5Digest(a2)); + *response_value = BinaryToHex(GetMD5Digest(combine_ss.str())); + return Status::OK(); +} + +static std::string QuoteString(const std::string &src) { + std::string dst; + dst.resize(2 * src.size()); + size_t j = 0; + for (size_t i = 0; i < src.size(); ++i) { + if (src[i] == '"') { + dst[j++] = '\\'; + } + dst[j++] = src[i]; + } + dst.resize(j); + return dst; +} + +static std::string GetMD5Digest(const std::string &src) { + MD5_CTX ctx; + unsigned long long res[2]; + MD5_Init(&ctx); + MD5_Update(&ctx, src.c_str(), src.size()); + MD5_Final(reinterpret_cast<unsigned char *>(res), &ctx); + return std::string(reinterpret_cast<char *>(res), sizeof(res)); +} + +static std::string BinaryToHex(const std::string &src) { + std::stringstream ss; + ss << std::hex << std::setfill('0'); + for (size_t i = 0; i < src.size(); ++i) { + unsigned c = (unsigned)(static_cast<unsigned char>(src[i])); + ss << std::setw(2) << c; + } + return ss.str(); +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc new file mode 100644 index 0000000..66cfa1c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "libhdfspp/status.h" + +#include <cassert> +#include <cstring> + +namespace hdfs { + +Status::Status(int code, const char *msg1) + : state_(ConstructState(code, msg1, nullptr)) {} + +Status::Status(int code, const char *msg1, const char *msg2) + : state_(ConstructState(code, msg1, msg2)) {} + +const char *Status::ConstructState(int code, const char *msg1, + const char *msg2) { + assert(code != kOk); + const uint32_t len1 = strlen(msg1); + const uint32_t len2 = msg2 ? strlen(msg2) : 0; + const uint32_t size = len1 + (len2 ? (2 + len2) : 0); + char *result = new char[size + 8 + 2]; + *reinterpret_cast<uint32_t *>(result) = size; + *reinterpret_cast<uint32_t *>(result + 4) = code; + memcpy(result + 8, msg1, len1); + if (len2) { + result[8 + len1] = ':'; + result[9 + len1] = ' '; + memcpy(result + 10 + len1, msg2, len2); + } + return result; +} + +std::string Status::ToString() const { + if (!state_) { + return "OK"; + } else { + uint32_t length = *reinterpret_cast<const uint32_t *>(state_); + return std::string(state_ + 8, length); + } +} + +const char *Status::CopyState(const char *state) { + uint32_t size; + memcpy(&size, state, sizeof(size)); + char *result = new char[size + 8]; + memcpy(result, state, size + 8); + return result; +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h new file mode 100644 index 0000000..ff9f36c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_UTIL_H_ +#define LIB_COMMON_UTIL_H_ + +#include "libhdfspp/status.h" + +#include <asio/error_code.hpp> + +#include <google/protobuf/message_lite.h> +#include <google/protobuf/io/coded_stream.h> + +namespace hdfs { + +static inline Status ToStatus(const ::asio::error_code &ec) { + if (ec) { + return Status(ec.value(), ec.message().c_str()); + } else { + return Status::OK(); + } +} + +static inline int DelimitedPBMessageSize( + const ::google::protobuf::MessageLite *msg) { + size_t size = msg->ByteSize(); + return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size; +} + +static inline void ReadDelimitedPBMessage( + ::google::protobuf::io::CodedInputStream *in, + ::google::protobuf::MessageLite *msg) { + uint32_t size = 0; + in->ReadVarint32(&size); + auto limit = in->PushLimit(size); + msg->ParseFromCodedStream(in); + in->PopLimit(limit); +} + +std::string Base64Encode(const std::string &src); + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt new file mode 100644 index 0000000..f386688 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt @@ -0,0 +1,2 @@ +add_library(fs filesystem.cc inputstream.cc) +add_dependencies(fs proto) http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc new file mode 100644 index 0000000..0b958a8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filesystem.h" +#include "common/continuation/asio.h" +#include "common/util.h" + +#include <asio/ip/tcp.hpp> + +#include <limits> + +namespace hdfs { + +static const char kNamenodeProtocol[] = + "org.apache.hadoop.hdfs.protocol.ClientProtocol"; +static const int kNamenodeProtocolVersion = 1; + +using ::asio::ip::tcp; + +FileSystem::~FileSystem() {} + +void FileSystem::New( + IoService *io_service, const Options &options, const std::string &server, + const std::string &service, + const std::function<void(const Status &, FileSystem *)> &handler) { + FileSystemImpl *impl = new FileSystemImpl(io_service, options); + impl->Connect(server, service, [impl, handler](const Status &stat) { + if (stat.ok()) { + handler(stat, impl); + } else { + delete impl; + handler(stat, nullptr); + } + }); +} + +FileSystemImpl::FileSystemImpl(IoService *io_service, const Options &options) + : io_service_(static_cast<IoServiceImpl *>(io_service)), + engine_(&io_service_->io_service(), options, + RpcEngine::GetRandomClientName(), kNamenodeProtocol, + kNamenodeProtocolVersion), + namenode_(&engine_) {} + +void FileSystemImpl::Connect(const std::string &server, + const std::string &service, + std::function<void(const Status &)> &&handler) { + using namespace continuation; + typedef std::vector<tcp::endpoint> State; + auto m = Pipeline<State>::Create(); + m->Push(Resolve(&io_service_->io_service(), server, service, + std::back_inserter(m->state()))) + .Push(Bind([this, m](const Continuation::Next &next) { + engine_.Connect(m->state().front(), next); + })); + m->Run([this, handler](const Status &status, const State &) { + if (status.ok()) { + engine_.Start(); + } + handler(status); + }); +} + +void FileSystemImpl::Open( + const std::string &path, + const std::function<void(const Status &, InputStream *)> &handler) { + using ::hadoop::hdfs::GetBlockLocationsRequestProto; + using ::hadoop::hdfs::GetBlockLocationsResponseProto; + + struct State { + GetBlockLocationsRequestProto req; + std::shared_ptr<GetBlockLocationsResponseProto> resp; + }; + + auto m = continuation::Pipeline<State>::Create(); + auto &req = m->state().req; + req.set_src(path); + req.set_offset(0); + req.set_length(std::numeric_limits<long long>::max()); + m->state().resp.reset(new GetBlockLocationsResponseProto()); + + State *s = &m->state(); + m->Push(continuation::Bind( + [this, s](const continuation::Continuation::Next &next) { + namenode_.GetBlockLocations(&s->req, s->resp, next); + })); + m->Run([this, handler](const Status &stat, const State &s) { + handler(stat, stat.ok() ? new InputStreamImpl(this, &s.resp->locations()) + : nullptr); + }); +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h new file mode 100644 index 0000000..72f80b7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_ +#define LIBHDFSPP_LIB_FS_FILESYSTEM_H_ + +#include "common/hdfs_public_api.h" +#include "libhdfspp/hdfs.h" +#include "rpc/rpc_engine.h" +#include "ClientNamenodeProtocol.pb.h" +#include "ClientNamenodeProtocol.hrpc.inl" + +namespace hdfs { + +class FileSystemImpl : public FileSystem { +public: + FileSystemImpl(IoService *io_service, const Options &options); + void Connect(const std::string &server, const std::string &service, + std::function<void(const Status &)> &&handler); + virtual void Open(const std::string &path, + const std::function<void(const Status &, InputStream *)> + &handler) override; + RpcEngine &rpc_engine() { return engine_; } + +private: + IoServiceImpl *io_service_; + RpcEngine engine_; + ClientNamenodeProtocol namenode_; +}; + +class InputStreamImpl : public InputStream { +public: + InputStreamImpl(FileSystemImpl *fs, + const ::hadoop::hdfs::LocatedBlocksProto *blocks); + virtual void + PositionRead(void *buf, size_t nbyte, uint64_t offset, + const std::set<std::string> &excluded_datanodes, + const std::function<void(const Status &, const std::string &, + size_t)> &handler) override; + template <class MutableBufferSequence, class Handler> + void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers, + const std::set<std::string> &excluded_datanodes, + const Handler &handler); + template <class BlockReaderTrait, class MutableBufferSequence, class Handler> + void AsyncReadBlock(const std::string &client_name, + const hadoop::hdfs::LocatedBlockProto &block, + const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset, + const MutableBufferSequence &buffers, + const Handler &handler); + +private: + FileSystemImpl *fs_; + unsigned long long file_length_; + std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_; + template <class Reader> struct HandshakeContinuation; + template <class Reader, class MutableBufferSequence> + struct ReadBlockContinuation; + struct RemoteBlockReaderTrait; +}; +} + +#include "inputstream_impl.h" + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc new file mode 100644 index 0000000..b47dcb1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filesystem.h" + +namespace hdfs { + +using ::hadoop::hdfs::LocatedBlocksProto; + +InputStream::~InputStream() {} + +InputStreamImpl::InputStreamImpl(FileSystemImpl *fs, + const LocatedBlocksProto *blocks) + : fs_(fs), file_length_(blocks->filelength()) { + for (const auto &block : blocks->blocks()) { + blocks_.push_back(block); + } + + if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) { + blocks_.push_back(blocks->lastblock()); + } +} + +void InputStreamImpl::PositionRead( + void *buf, size_t nbyte, uint64_t offset, + const std::set<std::string> &excluded_datanodes, + const std::function<void(const Status &, const std::string &, size_t)> + &handler) { + AsyncPreadSome(offset, asio::buffer(buf, nbyte), excluded_datanodes, handler); +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h new file mode 100644 index 0000000..2044f3f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef FS_INPUTSTREAM_IMPL_H_ +#define FS_INPUTSTREAM_IMPL_H_ + +#include "reader/block_reader.h" + +#include "common/continuation/asio.h" +#include "common/continuation/protobuf.h" + +#include <functional> +#include <future> +#include <type_traits> + +namespace hdfs { + +struct InputStreamImpl::RemoteBlockReaderTrait { + typedef RemoteBlockReader<asio::ip::tcp::socket> Reader; + struct State { + std::unique_ptr<asio::ip::tcp::socket> conn_; + std::unique_ptr<Reader> reader_; + std::array<asio::ip::tcp::endpoint, 1> endpoints_; + size_t transferred_; + Reader *reader() { return reader_.get(); } + size_t *transferred() { return &transferred_; } + const size_t *transferred() const { return &transferred_; } + }; + static continuation::Pipeline<State> * + CreatePipeline(::asio::io_service *io_service, + const ::hadoop::hdfs::DatanodeInfoProto &dn) { + using namespace ::asio::ip; + auto m = continuation::Pipeline<State>::Create(); + auto &s = m->state(); + s.conn_.reset(new tcp::socket(*io_service)); + s.reader_.reset(new Reader(BlockReaderOptions(), s.conn_.get())); + auto datanode = dn.id(); + s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()), + datanode.xferport()); + + m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(), + s.endpoints_.end())); + return m; + } +}; + +template <class Reader> +struct InputStreamImpl::HandshakeContinuation : continuation::Continuation { + HandshakeContinuation(Reader *reader, const std::string &client_name, + const hadoop::common::TokenProto *token, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, uint64_t offset) + : reader_(reader), client_name_(client_name), length_(length), + offset_(offset) { + if (token) { + token_.reset(new hadoop::common::TokenProto()); + token_->CheckTypeAndMergeFrom(*token); + } + block_.CheckTypeAndMergeFrom(*block); + } + + virtual void Run(const Next &next) override { + reader_->async_connect(client_name_, token_.get(), &block_, length_, + offset_, next); + } + +private: + Reader *reader_; + const std::string client_name_; + std::unique_ptr<hadoop::common::TokenProto> token_; + hadoop::hdfs::ExtendedBlockProto block_; + uint64_t length_; + uint64_t offset_; +}; + +template <class Reader, class MutableBufferSequence> +struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation { + ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer, + size_t *transferred) + : reader_(reader), buffer_(buffer), + buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) { + static_assert(!std::is_reference<MutableBufferSequence>::value, + "Buffer must not be a reference type"); + } + + virtual void Run(const Next &next) override { + *transferred_ = 0; + next_ = next; + OnReadData(Status::OK(), 0); + } + +private: + Reader *reader_; + const MutableBufferSequence buffer_; + const size_t buffer_size_; + size_t *transferred_; + std::function<void(const Status &)> next_; + + void OnReadData(const Status &status, size_t transferred) { + using std::placeholders::_1; + using std::placeholders::_2; + *transferred_ += transferred; + if (!status.ok()) { + next_(status); + } else if (*transferred_ >= buffer_size_) { + next_(status); + } else { + reader_->async_read_some( + asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_), + std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2)); + } + } +}; + +template <class MutableBufferSequence, class Handler> +void InputStreamImpl::AsyncPreadSome( + size_t offset, const MutableBufferSequence &buffers, + const std::set<std::string> &excluded_datanodes, const Handler &handler) { + using ::hadoop::hdfs::DatanodeInfoProto; + using ::hadoop::hdfs::LocatedBlockProto; + + auto it = std::find_if( + blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) { + return p.offset() <= offset && offset < p.offset() + p.b().numbytes(); + }); + + if (it == blocks_.end()) { + handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0); + return; + } + + const DatanodeInfoProto *chosen_dn = nullptr; + for (int i = 0; i < it->locs_size(); ++i) { + const auto &di = it->locs(i); + if (!excluded_datanodes.count(di.id().datanodeuuid())) { + chosen_dn = &di; + break; + } + } + + if (!chosen_dn) { + handler(Status::ResourceUnavailable("No datanodes available"), "", 0); + return; + } + + uint64_t offset_within_block = offset - it->offset(); + uint64_t size_within_block = std::min<uint64_t>( + it->b().numbytes() - offset_within_block, asio::buffer_size(buffers)); + + AsyncReadBlock<RemoteBlockReaderTrait>( + fs_->rpc_engine().client_name(), *it, *chosen_dn, offset_within_block, + asio::buffer(buffers, size_within_block), handler); +} + +template <class BlockReaderTrait, class MutableBufferSequence, class Handler> +void InputStreamImpl::AsyncReadBlock( + const std::string &client_name, + const hadoop::hdfs::LocatedBlockProto &block, + const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset, + const MutableBufferSequence &buffers, const Handler &handler) { + + typedef typename BlockReaderTrait::Reader Reader; + auto m = + BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), dn); + auto &s = m->state(); + size_t size = asio::buffer_size(buffers); + m->Push(new HandshakeContinuation<Reader>(s.reader(), client_name, nullptr, + &block.b(), size, offset)) + .Push(new ReadBlockContinuation<Reader, MutableBufferSequence>( + s.reader(), buffers, s.transferred())); + const std::string &dnid = dn.id().datanodeuuid(); + m->Run([handler, dnid](const Status &status, + const typename BlockReaderTrait::State &state) { + handler(status, dnid, *state.transferred()); + }); +} +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt new file mode 100644 index 0000000..609dec6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt @@ -0,0 +1,64 @@ +set(PROTOBUF_IMPORT_DIRS ${PROTO_HDFS_DIR} ${PROTO_HADOOP_DIR}) + +protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS + ${PROTO_HDFS_DIR}/datatransfer.proto + ${PROTO_HDFS_DIR}/ClientDatanodeProtocol.proto + ${PROTO_HDFS_DIR}/ClientNamenodeProtocol.proto + ${PROTO_HDFS_DIR}/acl.proto + ${PROTO_HDFS_DIR}/datatransfer.proto + ${PROTO_HDFS_DIR}/encryption.proto + ${PROTO_HDFS_DIR}/erasurecoding.proto + ${PROTO_HDFS_DIR}/hdfs.proto + ${PROTO_HDFS_DIR}/inotify.proto + ${PROTO_HDFS_DIR}/xattr.proto + ${PROTO_HADOOP_DIR}/IpcConnectionContext.proto + ${PROTO_HADOOP_DIR}/ProtobufRpcEngine.proto + ${PROTO_HADOOP_DIR}/RpcHeader.proto + ${PROTO_HADOOP_DIR}/Security.proto +) + +add_executable(protoc-gen-hrpc protoc_gen_hrpc.cc) +target_link_libraries(protoc-gen-hrpc ${PROTOBUF_PROTOC_LIBRARY} ${PROTOBUF_LIBRARY}) + +function(GEN_HRPC SRCS) + if(NOT ARGN) + message(SEND_ERROR "Error: GEN_HRPC() called without any proto files") + return() + endif() + + if(DEFINED PROTOBUF_IMPORT_DIRS) + foreach(DIR ${PROTOBUF_IMPORT_DIRS}) + get_filename_component(ABS_PATH ${DIR} ABSOLUTE) + list(FIND _protobuf_include_path ${ABS_PATH} _contains_already) + if(${_contains_already} EQUAL -1) + list(APPEND _protobuf_include_path -I ${ABS_PATH}) + endif() + endforeach() + endif() + + set(${SRCS}) + + foreach(FIL ${ARGN}) + get_filename_component(ABS_FIL ${FIL} ABSOLUTE) + get_filename_component(FIL_WE ${FIL} NAME_WE) + + list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.hrpc.inl") + + add_custom_command( + OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.hrpc.inl" + COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} + ARGS --plugin=protoc-gen-hrpc=${CMAKE_CURRENT_BINARY_DIR}/protoc-gen-hrpc --hrpc_out=${CMAKE_CURRENT_BINARY_DIR} ${_protobuf_include_path} ${ABS_FIL} + DEPENDS ${ABS_FIL} ${PROTOBUF_PROTOC_EXECUTABLE} protoc-gen-hrpc + COMMENT "Running HRPC protocol buffer compiler on ${FIL}" + VERBATIM ) + endforeach() + + set_source_files_properties(${${SRCS}} PROPERTIES GENERATED TRUE) + set(${SRCS} ${${SRCS}} PARENT_SCOPE) +endfunction() + +gen_hrpc(HRPC_SRCS + ${PROTO_HDFS_DIR}/ClientNamenodeProtocol.proto +) + +add_library(proto ${PROTO_SRCS} ${PROTO_HDRS} ${HRPC_SRCS}) http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h new file mode 100644 index 0000000..6f380ad --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h @@ -0,0 +1,82 @@ +// Protocol Buffers - Google's data interchange format +// Copyright 2008 Google Inc. All rights reserved. +// https://developers.google.com/protocol-buffers/ +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// Author: ken...@google.com (Kenton Varda) +// Based on original Protocol Buffers design by +// Sanjay Ghemawat, Jeff Dean, and others. + +#ifndef LIBHDFSPP_PROTO_CPP_HELPERS_H_ +#define LIBHDFSPP_PROTO_CPP_HELPERS_H_ + +#include <string> + +/** + * The functions in this file are derived from the original implementation of + *the protobuf library from Google. + **/ + +static inline std::string StripProto(const std::string &str) { + static const std::string kExtension = ".proto"; + if (str.size() >= kExtension.size() && + str.compare(str.size() - kExtension.size(), kExtension.size(), + kExtension) == 0) { + return str.substr(0, str.size() - kExtension.size()); + } else { + return str; + } +} + +static inline std::string ToCamelCase(const std::string &input) { + bool cap_next_letter = true; + std::string result; + // Note: I distrust ctype.h due to locales. + for (size_t i = 0; i < input.size(); i++) { + if ('a' <= input[i] && input[i] <= 'z') { + if (cap_next_letter) { + result += input[i] + ('A' - 'a'); + } else { + result += input[i]; + } + cap_next_letter = false; + } else if ('A' <= input[i] && input[i] <= 'Z') { + // Capital letters are left as-is. + result += input[i]; + cap_next_letter = false; + } else if ('0' <= input[i] && input[i] <= '9') { + result += input[i]; + cap_next_letter = true; + } else { + cap_next_letter = true; + } + } + return result; +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc new file mode 100644 index 0000000..d8e9ab2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cpp_helpers.h" + +#include <google/protobuf/compiler/code_generator.h> +#include <google/protobuf/compiler/plugin.h> +#include <google/protobuf/descriptor.h> +#include <google/protobuf/io/printer.h> +#include <google/protobuf/io/zero_copy_stream.h> +#include <google/protobuf/stubs/common.h> + +#include <memory> + +using ::google::protobuf::FileDescriptor; +using ::google::protobuf::MethodDescriptor; +using ::google::protobuf::ServiceDescriptor; +using ::google::protobuf::compiler::CodeGenerator; +using ::google::protobuf::compiler::GeneratorContext; +using ::google::protobuf::io::Printer; +using ::google::protobuf::io::ZeroCopyOutputStream; + +class StubGenerator : public CodeGenerator { +public: + virtual bool Generate(const FileDescriptor *file, const std::string &, + GeneratorContext *ctx, + std::string *error) const override; + +private: + void EmitService(const ServiceDescriptor *service, Printer *out) const; + void EmitMethod(const MethodDescriptor *method, Printer *out) const; +}; + +bool StubGenerator::Generate(const FileDescriptor *file, const std::string &, + GeneratorContext *ctx, std::string *) const { + namespace pb = ::google::protobuf; + std::unique_ptr<ZeroCopyOutputStream> os( + ctx->Open(StripProto(file->name()) + ".hrpc.inl")); + Printer out(os.get(), '$'); + for (int i = 0; i < file->service_count(); ++i) { + const ServiceDescriptor *service = file->service(i); + EmitService(service, &out); + } + return true; +} + +void StubGenerator::EmitService(const ServiceDescriptor *service, + Printer *out) const { + out->Print("\n// GENERATED AUTOMATICALLY. DO NOT MODIFY.\n" + "class $service$ {\n" + "private:\n" + " ::hdfs::RpcEngine *const engine_;\n" + "public:\n" + " typedef std::function<void(const ::hdfs::Status &)> Callback;\n" + " typedef ::google::protobuf::MessageLite Message;\n" + " inline $service$(::hdfs::RpcEngine *engine)\n" + " : engine_(engine) {}\n", + "service", service->name()); + for (int i = 0; i < service->method_count(); ++i) { + const MethodDescriptor *method = service->method(i); + EmitMethod(method, out); + } + out->Print("};\n"); +} + +void StubGenerator::EmitMethod(const MethodDescriptor *method, + Printer *out) const { + out->Print( + "\n inline void $camel_method$(const Message *req, " + "const std::shared_ptr<Message> &resp, " + "const Callback &handler) {\n" + " engine_->AsyncRpc(\"$method$\", req, resp, handler);\n" + " }\n", + "camel_method", ToCamelCase(method->name()), "method", method->name()); +} + +int main(int argc, char *argv[]) { + StubGenerator generator; + return google::protobuf::compiler::PluginMain(argc, argv, &generator); +}