http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc new file mode 100644 index 0000000..d29f1e9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "namenode_info.h" + +#include "common/util.h" +#include "common/logging.h" + +#include <sstream> +#include <utility> +#include <future> +#include <memory> + +namespace hdfs { + +ResolvedNamenodeInfo& ResolvedNamenodeInfo::operator=(const NamenodeInfo &info) { + nameservice = info.nameservice; + name = info.name; + uri = info.uri; + return *this; +} + + + +std::string ResolvedNamenodeInfo::str() const { + std::stringstream ss; + ss << "ResolvedNamenodeInfo {nameservice: " << nameservice << ", name: " << name << ", uri: " << uri.str(); + ss << ", host: " << uri.get_host(); + + if(uri.has_port()) + ss << ", port: " << uri.get_port(); + else + ss << ", invalid port (uninitialized)"; + + ss << ", scheme: " << uri.get_scheme(); + + ss << " ["; + for(unsigned int i=0;i<endpoints.size();i++) + ss << endpoints[i] << " "; + ss << "] }"; + + return ss.str(); +} + + +bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info) { + // this isn't very memory friendly, but if it needs to be called often there are bigger issues at hand + info.endpoints.clear(); + std::vector<ResolvedNamenodeInfo> resolved = BulkResolve(ioservice, {info}); + if(resolved.size() != 1) + return false; + + info.endpoints = resolved[0].endpoints; + if(info.endpoints.size() == 0) + return false; + return true; +} + +typedef std::vector<asio::ip::tcp::endpoint> endpoint_vector; + +// RAII wrapper +class ScopedResolver { + private: + ::asio::io_service *io_service_; + std::string host_; + std::string port_; + ::asio::ip::tcp::resolver::query query_; + ::asio::ip::tcp::resolver resolver_; + endpoint_vector endpoints_; + + // Caller blocks on access if resolution isn't finished + std::shared_ptr<std::promise<Status>> result_status_; + public: + ScopedResolver(::asio::io_service *service, const std::string &host, const std::string &port) : + io_service_(service), host_(host), port_(port), query_(host, port), resolver_(*io_service_) + { + if(!io_service_) + LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << " passed nullptr to io_service"); + } + + ~ScopedResolver() { + resolver_.cancel(); + } + + bool BeginAsyncResolve() { + // result_status_ would only exist if this was previously called. Invalid state. + if(result_status_) { + LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << "::BeginAsyncResolve invalid call: may only be called once per instance"); + return false; + } else if(!io_service_) { + LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << "::BeginAsyncResolve invalid call: null io_service"); + return false; + } + + // Now set up the promise, set it in async_resolve's callback + result_status_ = std::make_shared<std::promise<Status>>(); + std::shared_ptr<std::promise<Status>> shared_result = result_status_; + + // Callback to pull a copy of endpoints out of resolver and set promise + auto callback = [this, shared_result](const asio::error_code &ec, ::asio::ip::tcp::resolver::iterator out) { + if(!ec) { + std::copy(out, ::asio::ip::tcp::resolver::iterator(), std::back_inserter(endpoints_)); + } + shared_result->set_value( ToStatus(ec) ); + }; + resolver_.async_resolve(query_, callback); + return true; + } + + Status Join() { + if(!result_status_) { + std::ostringstream errmsg; + errmsg << "ScopedResolver@" << this << "Join invalid call: promise never set"; + return Status::InvalidArgument(errmsg.str().c_str()); + } + + std::future<Status> future_result = result_status_->get_future(); + Status res = future_result.get(); + return res; + } + + endpoint_vector GetEndpoints() { + // Explicitly return by value to decouple lifecycles. + return endpoints_; + } +}; + +std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, const std::vector<NamenodeInfo> &nodes) { + std::vector< std::unique_ptr<ScopedResolver> > resolvers; + resolvers.reserve(nodes.size()); + + std::vector<ResolvedNamenodeInfo> resolved_info; + resolved_info.reserve(nodes.size()); + + for(unsigned int i=0; i<nodes.size(); i++) { + std::string host = nodes[i].get_host(); + std::string port = nodes[i].get_port(); + + resolvers.emplace_back(new ScopedResolver(ioservice, host, port)); + resolvers[i]->BeginAsyncResolve(); + } + + // Join all async operations + for(unsigned int i=0; i < resolvers.size(); i++) { + Status asyncReturnStatus = resolvers[i]->Join(); + + ResolvedNamenodeInfo info; + info = nodes[i]; + + if(asyncReturnStatus.ok()) { + // Copy out endpoints if things went well + info.endpoints = resolvers[i]->GetEndpoints(); + } else { + LOG_ERROR(kAsyncRuntime, << "Unabled to resolve endpoints for host: " << nodes[i].get_host() + << " port: " << nodes[i].get_port()); + } + + resolved_info.push_back(info); + } + return resolved_info; +} + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h new file mode 100644 index 0000000..fdee8d7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMON_HDFS_NAMENODE_INFO_H_ +#define COMMON_HDFS_NAMENODE_INFO_H_ + +#include <asio.hpp> +#include <hdfspp/options.h> + +#include <string> +#include <vector> + +namespace hdfs { + +// Internal representation of namenode info that keeps track +// of its endpoints. +struct ResolvedNamenodeInfo : public NamenodeInfo { + ResolvedNamenodeInfo& operator=(const NamenodeInfo &info); + std::string str() const; + + std::vector<::asio::ip::tcp::endpoint> endpoints; +}; + +// Clear endpoints if set and resolve all of them in parallel. +// Only successful lookups will be placed in the result set. +std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, const std::vector<NamenodeInfo> &nodes); + +// Clear endpoints, if any, and resolve them again +// Return true if endpoints were resolved +bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info); + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/new_delete.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/new_delete.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/new_delete.h new file mode 100644 index 0000000..48f97ca --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/new_delete.h @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMON_HDFS_NEW_DELETE_H_ +#define COMMON_HDFS_NEW_DELETE_H_ + +#include <cstring> + +struct mem_struct { + size_t mem_size; +}; + +#ifndef NDEBUG +#define MEMCHECKED_CLASS(clazz) \ +static void* operator new(size_t size) { \ + void* p = ::malloc(size); \ + return p; \ +} \ +static void* operator new[](size_t size) { \ + mem_struct* p = (mem_struct*)::malloc(sizeof(mem_struct) + size); \ + p->mem_size = size; \ + return (void*)++p; \ +} \ +static void operator delete(void* p) { \ + ::memset(p, 0, sizeof(clazz)); \ + ::free(p); \ +} \ +static void operator delete[](void* p) { \ + mem_struct* header = (mem_struct*)p; \ + size_t size = (--header)->mem_size; \ + ::memset(p, 0, size); \ + ::free(header); \ +} +#else +#define MEMCHECKED_CLASS(clazz) +#endif +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/optional_wrapper.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/optional_wrapper.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/optional_wrapper.h new file mode 100644 index 0000000..2d15dc3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/optional_wrapper.h @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMON_OPTIONAL_WRAPPER_H_ +#define COMMON_OPTIONAL_WRAPPER_H_ + +#ifdef __clang__ + #pragma clang diagnostic push + #if __has_warning("-Wweak-vtables") + #pragma clang diagnostic ignored "-Wweak-vtables" + #endif + #if __has_warning("-Wreserved-id-macro") + #pragma clang diagnostic ignored "-Wreserved-id-macro" + #endif + #if __has_warning("-Wextra-semi") + #pragma clang diagnostic ignored "-Wextra-semi" + #endif + #define TR2_OPTIONAL_DISABLE_EMULATION_OF_TYPE_TRAITS //For Clang < 3_4_2 +#endif + +#include <optional.hpp> + +#ifdef __clang__ + #undef TR2_OPTIONAL_DISABLE_EMULATION_OF_TYPE_TRAITS //For Clang < 3_4_2 + #pragma clang diagnostic pop +#endif + +#endif //COMMON_OPTIONAL_WRAPPER_H_ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc new file mode 100644 index 0000000..48c83f1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfspp/options.h" + +namespace hdfs { + +// The linker needs a place to put all of those constants +const int Options::kDefaultRpcTimeout; +const int Options::kNoRetry; +const int Options::kDefaultMaxRpcRetries; +const int Options::kDefaultRpcRetryDelayMs; +const unsigned int Options::kDefaultHostExclusionDuration; +const unsigned int Options::kDefaultFailoverMaxRetries; +const unsigned int Options::kDefaultFailoverConnectionMaxRetries; +const long Options::kDefaultBlockSize; + +Options::Options() : rpc_timeout(kDefaultRpcTimeout), + rpc_connect_timeout(kDefaultRpcConnectTimeout), + max_rpc_retries(kDefaultMaxRpcRetries), + rpc_retry_delay_ms(kDefaultRpcRetryDelayMs), + host_exclusion_duration(kDefaultHostExclusionDuration), + defaultFS(), + failover_max_retries(kDefaultFailoverMaxRetries), + failover_connection_max_retries(kDefaultFailoverConnectionMaxRetries), + authentication(kDefaultAuthentication), + block_size(kDefaultBlockSize), + io_threads_(kDefaultIoThreads) +{ + +} + +std::string NamenodeInfo::get_host() const { + return uri.get_host(); +} + +std::string NamenodeInfo::get_port() const { + if(uri.has_port()) { + return std::to_string(uri.get_port()); + } + return "-1"; +} + + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc new file mode 100644 index 0000000..dca49fb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "common/retry_policy.h" +#include "common/logging.h" + +#include <sstream> + +namespace hdfs { + +RetryAction FixedDelayRetryPolicy::ShouldRetry( + const Status &s, uint64_t retries, uint64_t failovers, + bool isIdempotentOrAtMostOnce) const { + LOG_TRACE(kRPC, << "FixedDelayRetryPolicy::ShouldRetry(retries=" << retries << ", failovers=" << failovers << ")"); + (void)isIdempotentOrAtMostOnce; + if (retries + failovers >= max_retries_) { + return RetryAction::fail( + "Failovers and retries(" + std::to_string(retries + failovers) + + ") exceeded maximum retries (" + std::to_string(max_retries_) + "), Status: " + + s.ToString()); + } else { + return RetryAction::retry(delay_); + } +} + + +RetryAction NoRetryPolicy::ShouldRetry( + const Status &s, uint64_t retries, uint64_t failovers, + bool isIdempotentOrAtMostOnce) const { + LOG_TRACE(kRPC, << "NoRetryPolicy::ShouldRetry(retries=" << retries << ", failovers=" << failovers << ")"); + (void)retries; + (void)failovers; + (void)isIdempotentOrAtMostOnce; + return RetryAction::fail("No retry, Status: " + s.ToString()); +} + + +RetryAction FixedDelayWithFailover::ShouldRetry(const Status &s, uint64_t retries, + uint64_t failovers, + bool isIdempotentOrAtMostOnce) const { + (void)isIdempotentOrAtMostOnce; + (void)max_failover_conn_retries_; + LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry(retries=" << retries << ", failovers=" << failovers << ")"); + + if(failovers < max_failover_retries_ && (s.code() == ::asio::error::timed_out || s.get_server_exception_type() == Status::kStandbyException) ) + { + // Try connecting to another NN in case this one keeps timing out + // Can add the backoff wait specified by dfs.client.failover.sleep.base.millis here + if(failovers == 0) { + // No delay on first failover if it looks like the NN was bad. + return RetryAction::failover(0); + } else { + return RetryAction::failover(delay_); + } + } + + if(retries < max_retries_ && failovers < max_failover_retries_) { + LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries < max_retries_ && failovers < max_failover_retries_"); + return RetryAction::retry(delay_); + } else if (retries >= max_retries_ && failovers < max_failover_retries_) { + LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries >= max_retries_ && failovers < max_failover_retries_"); + return RetryAction::failover(delay_); + } else if (retries <= max_retries_ && failovers == max_failover_retries_) { + LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries <= max_retries_ && failovers == max_failover_retries_"); + // 1 last retry on new connection + return RetryAction::retry(delay_); + } + + return RetryAction::fail("Retry and failover didn't work, Status: " + s.ToString()); +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h new file mode 100644 index 0000000..0b5bd80 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_RETRY_POLICY_H_ +#define LIB_COMMON_RETRY_POLICY_H_ + +#include "common/util.h" + +#include <string> +#include <stdint.h> + +namespace hdfs { + +class RetryAction { + public: + enum RetryDecision { FAIL, RETRY, FAILOVER_AND_RETRY }; + + RetryDecision action; + uint64_t delayMillis; + std::string reason; + + RetryAction(RetryDecision in_action, uint64_t in_delayMillis, + const std::string &in_reason) + : action(in_action), delayMillis(in_delayMillis), reason(in_reason) {} + + static RetryAction fail(const std::string &reason) { + return RetryAction(FAIL, 0, reason); + } + static RetryAction retry(uint64_t delay) { + return RetryAction(RETRY, delay, ""); + } + static RetryAction failover(uint64_t delay) { + return RetryAction(FAILOVER_AND_RETRY, delay, ""); + } + + std::string decision_str() const { + switch(action) { + case FAIL: return "FAIL"; + case RETRY: return "RETRY"; + case FAILOVER_AND_RETRY: return "FAILOVER_AND_RETRY"; + default: return "UNDEFINED ACTION"; + } + }; +}; + +class RetryPolicy { + protected: + uint64_t delay_; + uint64_t max_retries_; + RetryPolicy(uint64_t delay, uint64_t max_retries) : + delay_(delay), max_retries_(max_retries) {} + + public: + RetryPolicy() {}; + + virtual ~RetryPolicy() {} + /* + * If there was an error in communications, responds with the configured + * action to take. + */ + virtual RetryAction ShouldRetry(const Status &s, uint64_t retries, + uint64_t failovers, + bool isIdempotentOrAtMostOnce) const = 0; + + virtual std::string str() const { return "Base RetryPolicy"; } +}; + + +/* + * Overview of how the failover retry policy works: + * + * 1) Acts the same as FixedDelayRetryPolicy in terms of connection retries against a single NN + * with two differences: + * a) If we have retried more than the maximum number of retries we will failover to the + * other node and reset the retry counter rather than error out. It will begin the same + * routine on the other node. + * b) If an attempted connection times out and max_failover_conn_retries_ is less than the + * normal number of retries it will failover sooner. The connection timeout retry limit + * defaults to zero; the idea being that if a node is unresponsive it's better to just + * try the secondary rather than incur the timeout cost multiple times. + * + * 2) Keeps track of the failover count in the same way that the retry count is tracked. If failover + * is triggered more than a set number (dfs.client.failover.max.attempts) of times then the operation + * will error out in the same way that a non-HA operation would error if it ran out of retries. + * + * 3) Failover between namenodes isn't instantaneous so the RPC retry delay is reused to add a small + * delay between failover attempts. This helps prevent the client from quickly using up all of + * its failover attempts while thrashing between namenodes that are both temporarily marked standby. + * Note: The java client implements exponential backoff here with a base other than the rpc delay, + * and this will do the same here in the future. This doesn't do any sort of exponential backoff + * and the name can be changed to ExponentialDelayWithFailover when backoff is implemented. + */ +class FixedDelayWithFailover : public RetryPolicy { + public: + FixedDelayWithFailover(uint64_t delay, uint64_t max_retries, + uint64_t max_failover_retries, + uint64_t max_failover_conn_retries) + : RetryPolicy(delay, max_retries), max_failover_retries_(max_failover_retries), + max_failover_conn_retries_(max_failover_conn_retries) {} + + RetryAction ShouldRetry(const Status &s, uint64_t retries, + uint64_t failovers, + bool isIdempotentOrAtMostOnce) const override; + + std::string str() const override { return "FixedDelayWithFailover"; } + + private: + // Attempts to fail over + uint64_t max_failover_retries_; + // Attempts to fail over if connection times out rather than + // tring to connect and wait for the timeout delay failover_retries_ + // times. + uint64_t max_failover_conn_retries_; +}; + + +/* + * Returns a fixed delay up to a certain number of retries + */ +class FixedDelayRetryPolicy : public RetryPolicy { + public: + FixedDelayRetryPolicy(uint64_t delay, uint64_t max_retries) + : RetryPolicy(delay, max_retries) {} + + RetryAction ShouldRetry(const Status &s, uint64_t retries, + uint64_t failovers, + bool isIdempotentOrAtMostOnce) const override; + + std::string str() const override { return "FixedDelayRetryPolicy"; } +}; + +/* + * Never retries + */ +class NoRetryPolicy : public RetryPolicy { + public: + NoRetryPolicy() {}; + RetryAction ShouldRetry(const Status &s, uint64_t retries, + uint64_t failovers, + bool isIdempotentOrAtMostOnce) const override; + + std::string str() const override { return "NoRetryPolicy"; } +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h new file mode 100644 index 0000000..78b2a55 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_SASL_AUTHENTICATOR_H_ +#define LIB_COMMON_SASL_AUTHENTICATOR_H_ + +#include "hdfspp/status.h" + +namespace hdfs { + +class DigestMD5AuthenticatorTest_TestResponse_Test; + +/** + * A specialized implementation of RFC 2831 for the HDFS + * DataTransferProtocol. + * + * The current lacks the following features: + * * Encoding the username, realm, and password in ISO-8859-1 when + * it is required by the RFC. They are always encoded in UTF-8. + * * Checking whether the challenges from the server are + * well-formed. + * * Specifying authzid, digest-uri and maximum buffer size. + * * Supporting QOP other than the auth level. + **/ +class DigestMD5Authenticator { +public: + Status EvaluateResponse(const std::string &payload, std::string *result); + DigestMD5Authenticator(const std::string &username, + const std::string &password, bool mock_nonce = false); + +private: + Status GenerateFirstResponse(std::string *result); + Status GenerateResponseValue(std::string *response_value); + Status ParseFirstChallenge(const std::string &payload); + + static size_t NextToken(const std::string &payload, size_t off, + std::string *tok); + void GenerateCNonce(); + std::string username_; + std::string password_; + std::string nonce_; + std::string cnonce_; + std::string realm_; + std::string qop_; + unsigned nonce_count_; + + const bool TEST_mock_cnonce_; + friend class DigestMD5AuthenticatorTest_TestResponse_Test; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc new file mode 100644 index 0000000..3ca8578 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "sasl_authenticator.h" + +#include "common/util.h" + +#include <openssl/rand.h> +#include <openssl/md5.h> + +#include <iomanip> +#include <map> +#include <sstream> + +namespace hdfs { + +static std::string QuoteString(const std::string &src); +static std::string GetMD5Digest(const std::string &src); +static std::string BinaryToHex(const std::string &src); + +static const char kDigestUri[] = "hdfs/0"; +static const size_t kMaxBufferSize = 65536; + +DigestMD5Authenticator::DigestMD5Authenticator(const std::string &username, + const std::string &password, + bool mock_nonce) + : username_(username), password_(password), nonce_count_(0), + TEST_mock_cnonce_(mock_nonce) {} + +Status DigestMD5Authenticator::EvaluateResponse(const std::string &payload, + std::string *result) { + Status status = ParseFirstChallenge(payload); + if (status.ok()) { + status = GenerateFirstResponse(result); + } + return status; +} + +size_t DigestMD5Authenticator::NextToken(const std::string &payload, size_t off, + std::string *tok) { + tok->clear(); + if (off >= payload.size()) { + return std::string::npos; + } + + char c = payload[off]; + if (c == '=' || c == ',') { + *tok = c; + return off + 1; + } + + int quote_count = 0; + for (; off < payload.size(); ++off) { + char c = payload[off]; + if (c == '"') { + ++quote_count; + if (quote_count == 2) { + return off + 1; + } + continue; + } + + if (c == '=') { + if (quote_count) { + tok->append(&c, 1); + } else { + break; + } + } else if (('0' <= c && c <= '9') || ('a' <= c && c <= 'z') || + ('A' <= c && c <= 'Z') || c == '+' || c == '/' || c == '-' || + c == '_' || c == '@') { + tok->append(&c, 1); + } else { + break; + } + } + return off; +} + +void DigestMD5Authenticator::GenerateCNonce() { + if (!TEST_mock_cnonce_) { + char buf[8] = {0,}; + RAND_pseudo_bytes(reinterpret_cast<unsigned char *>(buf), sizeof(buf)); + cnonce_ = Base64Encode(std::string(buf, sizeof(buf))); + } +} + +Status DigestMD5Authenticator::ParseFirstChallenge(const std::string &payload) { + std::map<std::string, std::string> props; + std::string token; + enum { + kStateLVal, + kStateEqual, + kStateRVal, + kStateCommaOrEnd, + }; + + int state = kStateLVal; + + std::string lval, rval; + size_t off = 0; + while (true) { + off = NextToken(payload, off, &token); + if (off == std::string::npos) { + break; + } + + switch (state) { + case kStateLVal: + lval = token; + state = kStateEqual; + break; + case kStateEqual: + state = kStateRVal; + break; + case kStateRVal: + rval = token; + props[lval] = rval; + state = kStateCommaOrEnd; + break; + case kStateCommaOrEnd: + state = kStateLVal; + break; + } + } + + if (props["algorithm"] != "md5-sess" || props["charset"] != "utf-8" || + props.find("nonce") == props.end()) { + return Status::Error("Invalid challenge"); + } + realm_ = props["realm"]; + nonce_ = props["nonce"]; + qop_ = props["qop"]; + return Status::OK(); +} + +Status DigestMD5Authenticator::GenerateFirstResponse(std::string *result) { + // TODO: Support auth-int and auth-conf + // Handle cipher + if (qop_ != "auth") { + return Status::Unimplemented(); + } + + std::stringstream ss; + GenerateCNonce(); + ss << "charset=utf-8,username=\"" << QuoteString(username_) << "\"" + << ",authzid=\"" << QuoteString(username_) << "\"" + << ",nonce=\"" << QuoteString(nonce_) << "\"" + << ",digest-uri=\"" << kDigestUri << "\"" + << ",maxbuf=" << kMaxBufferSize << ",cnonce=\"" << cnonce_ << "\""; + + if (realm_.size()) { + ss << ",realm=\"" << QuoteString(realm_) << "\""; + } + + ss << ",nc=" << std::hex << std::setw(8) << std::setfill('0') + << ++nonce_count_; + std::string response_value; + GenerateResponseValue(&response_value); + ss << ",response=" << response_value; + *result = ss.str(); + return result->size() > 4096 ? Status::Error("Response too big") + : Status::OK(); +} + +/** + * Generate the response value specified in S 2.1.2.1 in RFC2831. + **/ +Status +DigestMD5Authenticator::GenerateResponseValue(std::string *response_value) { + std::stringstream begin_a1, a1_ss; + std::string a1, a2; + + if (qop_ == "auth") { + a2 = std::string("AUTHENTICATE:") + kDigestUri; + } else { + a2 = std::string("AUTHENTICATE:") + kDigestUri + + ":00000000000000000000000000000000"; + } + + begin_a1 << username_ << ":" << realm_ << ":" << password_; + a1_ss << GetMD5Digest(begin_a1.str()) << ":" << nonce_ << ":" << cnonce_ + << ":" << username_; + + std::stringstream combine_ss; + combine_ss << BinaryToHex(GetMD5Digest(a1_ss.str())) << ":" << nonce_ << ":" + << std::hex << std::setw(8) << std::setfill('0') << nonce_count_ + << ":" << cnonce_ << ":" << qop_ << ":" + << BinaryToHex(GetMD5Digest(a2)); + *response_value = BinaryToHex(GetMD5Digest(combine_ss.str())); + return Status::OK(); +} + +static std::string QuoteString(const std::string &src) { + std::string dst; + dst.resize(2 * src.size()); + size_t j = 0; + for (size_t i = 0; i < src.size(); ++i) { + if (src[i] == '"') { + dst[j++] = '\\'; + } + dst[j++] = src[i]; + } + dst.resize(j); + return dst; +} + +static std::string GetMD5Digest(const std::string &src) { + MD5_CTX ctx; + unsigned long long res[2]; + MD5_Init(&ctx); + MD5_Update(&ctx, src.c_str(), src.size()); + MD5_Final(reinterpret_cast<unsigned char *>(res), &ctx); + return std::string(reinterpret_cast<char *>(res), sizeof(res)); +} + +static std::string BinaryToHex(const std::string &src) { + std::stringstream ss; + ss << std::hex << std::setfill('0'); + for (size_t i = 0; i < src.size(); ++i) { + unsigned c = (unsigned)(static_cast<unsigned char>(src[i])); + ss << std::setw(2) << c; + } + return ss.str(); +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/statinfo.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/statinfo.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/statinfo.cc new file mode 100644 index 0000000..2fb744f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/statinfo.cc @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hdfspp/statinfo.h> +#include <sys/stat.h> +#include <sstream> +#include <iomanip> + +namespace hdfs { + +StatInfo::StatInfo() + : file_type(0), + length(0), + permissions(0), + modification_time(0), + access_time(0), + block_replication(0), + blocksize(0), + fileid(0), + children_num(0) { +} + +std::string StatInfo::str() const { + char perms[11]; + perms[0] = file_type == StatInfo::IS_DIR ? 'd' : '-'; + perms[1] = permissions & S_IRUSR? 'r' : '-'; + perms[2] = permissions & S_IWUSR? 'w': '-'; + perms[3] = permissions & S_IXUSR? 'x': '-'; + perms[4] = permissions & S_IRGRP? 'r' : '-'; + perms[5] = permissions & S_IWGRP? 'w': '-'; + perms[6] = permissions & S_IXGRP? 'x': '-'; + perms[7] = permissions & S_IROTH? 'r' : '-'; + perms[8] = permissions & S_IWOTH? 'w': '-'; + perms[9] = permissions & S_IXOTH? 'x': '-'; + perms[10] = 0; + + //Convert to seconds from milliseconds + const int time_field_length = 17; + time_t rawtime = modification_time/1000; + struct tm * timeinfo; + char buffer[time_field_length]; + timeinfo = localtime(&rawtime); + + strftime(buffer,time_field_length,"%Y-%m-%d %H:%M",timeinfo); + buffer[time_field_length-1] = 0; //null terminator + std::string time(buffer); + + std::stringstream ss; + ss << std::left << std::setw(12) << perms + << std::left << std::setw(3) << (!block_replication ? "-" : std::to_string(block_replication)) + << std::left << std::setw(15) << owner + << std::left << std::setw(15) << group + << std::right << std::setw(5) << length + << std::right << std::setw(time_field_length + 2) << time//modification_time + << " " << full_path; + return ss.str(); +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc new file mode 100644 index 0000000..4c5c7be --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfspp/status.h" + +#include <cassert> +#include <sstream> +#include <cstring> +#include <map> +#include <set> + +namespace hdfs { + +// Server side exceptions that we capture from the RpcResponseHeaderProto +const char * kStatusAccessControlException = "org.apache.hadoop.security.AccessControlException"; +const char * kPathIsNotDirectoryException = "org.apache.hadoop.fs.PathIsNotDirectoryException"; +const char * kSnapshotException = "org.apache.hadoop.hdfs.protocol.SnapshotException"; +const char * kStatusStandbyException = "org.apache.hadoop.ipc.StandbyException"; +const char * kStatusSaslException = "javax.security.sasl.SaslException"; +const char * kPathNotFoundException = "org.apache.hadoop.fs.InvalidPathException"; +const char * kPathNotFoundException2 = "java.io.FileNotFoundException"; +const char * kFileAlreadyExistsException = "org.apache.hadoop.fs.FileAlreadyExistsException"; +const char * kPathIsNotEmptyDirectoryException = "org.apache.hadoop.fs.PathIsNotEmptyDirectoryException"; + + +const static std::map<std::string, int> kKnownServerExceptionClasses = { + {kStatusAccessControlException, Status::kAccessControlException}, + {kPathIsNotDirectoryException, Status::kNotADirectory}, + {kSnapshotException, Status::kSnapshotProtocolException}, + {kStatusStandbyException, Status::kStandbyException}, + {kStatusSaslException, Status::kAuthenticationFailed}, + {kPathNotFoundException, Status::kPathNotFound}, + {kPathNotFoundException2, Status::kPathNotFound}, + {kFileAlreadyExistsException, Status::kFileAlreadyExists}, + {kPathIsNotEmptyDirectoryException, Status::kPathIsNotEmptyDirectory} + }; + +// Errors that retry cannot fix. TODO: complete the list. +const static std::set<int> noRetryExceptions = { + Status::kPermissionDenied, + Status::kAuthenticationFailed, + Status::kAccessControlException +}; + +Status::Status(int code, const char *msg1) + : code_(code) { + if(msg1) { + msg_ = msg1; + } +} + +Status::Status(int code, const char *exception_class_name, const char *exception_details) + : code_(code) { + // If we can assure this never gets nullptr args this can be + // in the initializer list. + if(exception_class_name) + exception_class_ = exception_class_name; + if(exception_details) + msg_ = exception_details; + + std::map<std::string, int>::const_iterator it = kKnownServerExceptionClasses.find(exception_class_); + if(it != kKnownServerExceptionClasses.end()) { + code_ = it->second; + } +} + + +Status Status::OK() { + return Status(); +} + +Status Status::InvalidArgument(const char *msg) { + return Status(kInvalidArgument, msg); +} + +Status Status::PathNotFound(const char *msg){ + return Status(kPathNotFound, msg); +} + +Status Status::ResourceUnavailable(const char *msg) { + return Status(kResourceUnavailable, msg); +} + +Status Status::PathIsNotDirectory(const char *msg) { + return Status(kNotADirectory, msg); +} + +Status Status::Unimplemented() { + return Status(kUnimplemented, ""); +} + +Status Status::Exception(const char *exception_class_name, const char *error_message) { + // Server side exception but can be represented by std::errc codes + if (exception_class_name && (strcmp(exception_class_name, kStatusAccessControlException) == 0) ) + return Status(kPermissionDenied, error_message); + else if (exception_class_name && (strcmp(exception_class_name, kStatusSaslException) == 0)) + return AuthenticationFailed(); + else if (exception_class_name && (strcmp(exception_class_name, kPathNotFoundException) == 0)) + return Status(kPathNotFound, error_message); + else if (exception_class_name && (strcmp(exception_class_name, kPathNotFoundException2) == 0)) + return Status(kPathNotFound, error_message); + else if (exception_class_name && (strcmp(exception_class_name, kPathIsNotDirectoryException) == 0)) + return Status(kNotADirectory, error_message); + else if (exception_class_name && (strcmp(exception_class_name, kSnapshotException) == 0)) + return Status(kInvalidArgument, error_message); + else if (exception_class_name && (strcmp(exception_class_name, kFileAlreadyExistsException) == 0)) + return Status(kFileAlreadyExists, error_message); + else if (exception_class_name && (strcmp(exception_class_name, kPathIsNotEmptyDirectoryException) == 0)) + return Status(kPathIsNotEmptyDirectory, error_message); + else + return Status(kException, exception_class_name, error_message); +} + +Status Status::Error(const char *error_message) { + return Exception("Exception", error_message); +} + +Status Status::AuthenticationFailed() { + return Status::AuthenticationFailed(nullptr); +} + +Status Status::AuthenticationFailed(const char *msg) { + std::string formatted = "AuthenticationFailed"; + if(msg) { + formatted += ": "; + formatted += msg; + } + return Status(kAuthenticationFailed, formatted.c_str()); +} + +Status Status::AuthorizationFailed() { + return Status::AuthorizationFailed(nullptr); +} + +Status Status::AuthorizationFailed(const char *msg) { + std::string formatted = "AuthorizationFailed"; + if(msg) { + formatted += ": "; + formatted += msg; + } + return Status(kPermissionDenied, formatted.c_str()); +} + +Status Status::Canceled() { + return Status(kOperationCanceled, "Operation canceled"); +} + +Status Status::InvalidOffset(const char *msg){ + return Status(kInvalidOffset, msg); +} + +std::string Status::ToString() const { + if (code_ == kOk) { + return "OK"; + } + std::stringstream ss; + if(!exception_class_.empty()) { + ss << exception_class_ << ":"; + } + ss << msg_; + return ss.str(); +} + +bool Status::notWorthRetry() const { + return noRetryExceptions.find(code_) != noRetryExceptions.end(); +} + +Status Status::MutexError(const char *msg) { + std::string formatted = "MutexError"; + if(msg) { + formatted += ": "; + formatted += msg; + } + return Status(kBusy/*try_lock failure errno*/, msg); +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc new file mode 100644 index 0000000..9e9319b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc @@ -0,0 +1,454 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include <hdfspp/uri.h> + +#include <uriparser2/uriparser/Uri.h> + +#include <string.h> +#include <sstream> +#include <cstdlib> +#include <cassert> +#include <limits> + +namespace hdfs +{ + +/////////////////////////////////////////////////////////////////////////////// +// +// Internal utilities +// +/////////////////////////////////////////////////////////////////////////////// + +const char kReserved[] = ":/?#[]@%+"; + +std::string URI::encode(const std::string & decoded) +{ + bool hasCharactersToEncode = false; + for (auto c : decoded) + { + if (isalnum(c) || (strchr(kReserved, c) == NULL)) + { + continue; + } + else + { + hasCharactersToEncode = true; + break; + } + } + + if (hasCharactersToEncode) + { + std::vector<char> buf(decoded.size() * 3 + 1); + uriEscapeA(decoded.c_str(), &buf[0], true, URI_BR_DONT_TOUCH); + return std::string(&buf[0]); + } + else + { + return decoded; + } +} + +std::string URI::decode(const std::string & encoded) +{ + bool hasCharactersToDecode = false; + for (auto c : encoded) + { + switch (c) + { + case '%': + case '+': + hasCharactersToDecode = true; + break; + default: + continue; + } + } + + if (hasCharactersToDecode) + { + std::vector<char> buf(encoded.size() + 1); + strncpy(&buf[0], encoded.c_str(), buf.size()); + uriUnescapeInPlaceExA(&buf[0], true, URI_BR_DONT_TOUCH); + return std::string(&buf[0]); + } + else + { + return encoded; + } +} + +std::vector<std::string> split(const std::string input, char separator) +{ + std::vector<std::string> result; + + if (!input.empty()) + { + const char * remaining = input.c_str(); + if (*remaining == '/') + remaining++; + + const char * next_end = strchr(remaining, separator); + while (next_end) { + int len = next_end - remaining; + if (len) + result.push_back(std::string(remaining, len)); + else + result.push_back(""); + remaining = next_end + 1; + next_end = strchr(remaining, separator); + } + result.push_back(std::string(remaining)); + } + + return result; +} + + + +/////////////////////////////////////////////////////////////////////////////// +// +// Parsing +// +/////////////////////////////////////////////////////////////////////////////// + + + +std::string copy_range(const UriTextRangeA *r) { + const int size = r->afterLast - r->first; + if (size) { + return std::string(r->first, size); + } + return ""; +} + +bool parse_int(const UriTextRangeA *r, int32_t& result) +{ + std::string int_str = copy_range(r); + if(!int_str.empty()) { + errno = 0; + unsigned long val = ::strtoul(int_str.c_str(), nullptr, 10); + if(errno == 0 && val < std::numeric_limits<uint16_t>::max()) { + result = val; + return true; + } else { + return false; + } + } + return true; +} + + +std::vector<std::string> copy_path(const UriPathSegmentA *ps) { + std::vector<std::string> result; + if (nullptr == ps) + return result; + + for (; ps != 0; ps = ps->next) { + result.push_back(copy_range(&ps->text)); + } + + return result; +} + +void parse_user_info(const UriTextRangeA *r, std::string * user, std::string * pass) { + // Output parameters + assert(user); + assert(pass); + + std::string user_and_password = copy_range(r); + if (!user_and_password.empty()) { + const char * begin = user_and_password.c_str(); + const char * colon_loc = strchr(begin, ':'); + if (colon_loc) { + *user = std::string(begin, colon_loc - begin - 1); + *pass = colon_loc + 1; + } else { + *user = user_and_password; + } + } +} + + +std::vector<URI::Query> parse_queries(const char *first, const char * afterLast) { + std::vector<URI::Query> result; + UriQueryListA * query; + int count; + int dissect_result = uriDissectQueryMallocExA(&query, &count, first, afterLast, false, URI_BR_DONT_TOUCH); + if (URI_SUCCESS == dissect_result) { + for (auto ps = query; ps != nullptr; ps = ps->next) { + std::string key = ps->key ? URI::encode(ps->key) : ""; + std::string value = ps->value ? URI::encode(ps->value) : ""; + result.emplace_back(key, value); + } + uriFreeQueryListA(query); + } + + return result; +} + +// Parse a string into a URI. Throw a hdfs::uri_parse_error if URI is malformed. +URI URI::parse_from_string(const std::string &str) +{ + URI ret; + bool ok = true; + + UriParserStateA state; + memset(&state, 0, sizeof(state)); + UriUriA uu; + + state.uri = &uu; + int parseResult = uriParseUriA(&state, str.c_str()); + ok &= (parseResult == URI_SUCCESS); + + if (ok) { + ret.scheme = copy_range(&uu.scheme); + ret.host = copy_range(&uu.hostText); + ok &= parse_int(&uu.portText, ret._port); + ret.path = copy_path(uu.pathHead); + ret.queries = parse_queries(uu.query.first, uu.query.afterLast); + ret.fragment = copy_range(&uu.fragment); + parse_user_info(&uu.userInfo, &ret.user, &ret.pass); + uriFreeUriMembersA(&uu); + } + uriFreeUriMembersA(&uu); + + if (ok) { + return ret; + } else { + throw uri_parse_error(str); + } +} + +/////////////////////////////////////////////////////////////////////////////// +// +// Getters and setters +// +/////////////////////////////////////////////////////////////////////////////// + +URI::URI() : _port(-1) {} + +URI::Query::Query(const std::string& k, const std::string& v) : key(k), value(v) {} + +std::string URI::str(bool encoded_output) const +{ + std::stringstream ss; + if (!scheme.empty()) ss << from_encoded(encoded_output, scheme) << "://"; + if (!user.empty() || !pass.empty()) { + if (!user.empty()) ss << from_encoded(encoded_output, user); + if (!pass.empty()) ss << ":" << from_encoded(encoded_output, pass); + ss << "@"; + } + if (has_authority()) ss << build_authority(encoded_output); + if (!path.empty()) ss << get_path(encoded_output); + if (!queries.empty()) ss << "?" << get_query(encoded_output); + if (!fragment.empty()) ss << "#" << from_encoded(encoded_output, fragment); + + return ss.str(); +} + +bool URI::has_authority() const +{ + return (!host.empty()) || (has_port()); +} + +std::string URI::build_authority(bool encoded_output) const +{ + std::stringstream ss; + ss << URI::from_encoded(encoded_output, host); + if (has_port()) + { + ss << ":" << _port; + } + return ss.str(); +} + +std::string URI::get_scheme(bool encoded_output) const { + return from_encoded(encoded_output,scheme); +} + +void URI::set_scheme(const std::string &s, bool encoded_input) { + scheme = to_encoded(encoded_input,s); +} + +std::string URI::get_host(bool encoded_output) const { + return from_encoded(encoded_output,host); +} + +void URI::set_host(const std::string& h, bool encoded_input) { + host = to_encoded(encoded_input,h); +} + +bool URI::has_port() const { + return _port != -1; +} + +uint16_t URI::get_port() const { + return (uint16_t)_port; +} + +uint16_t URI::get_port_or_default(uint16_t val) const { + return has_port() ? (uint16_t)_port : val; +} + +void URI::set_port(uint16_t p) +{ + _port = (int32_t)p & 0xFFFF; +} + +void URI::clear_port() +{ + _port = -1; +} + +std::string URI::get_path(bool encoded_output) const +{ + std::ostringstream out; + for (const std::string& s: path) { + out << "/" << from_encoded(encoded_output, s); + } + return out.str(); +} + +std::vector<std::string> URI::get_path_elements(bool encoded_output) const +{ + std::vector<std::string> result; + for (const std::string& path_elem: path) { + result.push_back(from_encoded(encoded_output, path_elem)); + } + + return result; +} + +void URI::parse_path(bool input_encoded, const std::string &input_path) +{ + std::vector<std::string> split_path = split(input_path, '/'); + for (const std::string& s: split_path) { + path.push_back(to_encoded(input_encoded, s)); + } +} + +// Mostly copied and modified from uriparser2.c + +void URI::set_path(const std::string &p, bool encoded_input) { + parse_path(encoded_input, p); +} + +void URI::add_path(const std::string &p, bool encoded_input) +{ + path.push_back(to_encoded(encoded_input, p)); +} + +std::string URI::get_query(bool encoded_output) const { + bool first = true; + std::stringstream ss; + for (const Query& q: queries) { + if (!first) { + ss << "&"; + } + ss << from_encoded(encoded_output, q.key) << "=" << from_encoded(encoded_output, q.value); + first = false; + } + + return ss.str(); +} + +std::vector<URI::Query> URI::get_query_elements(bool encoded_output) const +{ + std::vector<Query> result; + for (const Query& q: queries) { + std::string key = from_encoded(encoded_output, q.key); + std::string value = from_encoded(encoded_output, q.value); + result.emplace_back(key, value); + } + + return result; +} + +void URI::set_query(const std::string &q) { + queries = parse_queries(q.c_str(), q.c_str() + q.size() + 1); +} + + +void URI::add_query(const std::string &name, const std::string & value, bool encoded_input) +{ + queries.emplace_back(to_encoded(encoded_input, name), to_encoded(encoded_input, value)); +} + +void URI::remove_query(const std::string &q_name, bool encoded_input) +{ + if (queries.empty()) + return; + + // This is the one place we need to do decoded comparisons + std::string decoded_key = encoded_input ? decode(q_name) : q_name; + + for (int i = queries.size() - 1; i >= 0; i--) { + if (decode(queries[i].key) == decoded_key) { + queries.erase(queries.begin() + i); + } + } +} + +std::string URI::get_fragment(bool encoded_output) const { + return from_encoded(encoded_output, fragment); +} + +void URI::set_fragment(const std::string &f, bool encoded_input) { + fragment = to_encoded(encoded_input,f); +} + +std::string URI::from_encoded(bool encoded_output, const std::string & input) { + return encoded_output ? input : decode(input); +} + +std::string URI::to_encoded(bool encoded_input, const std::string & input) { + return encoded_input ? input : encode(input); +} + +std::string URI::GetDebugString() const { + std::stringstream ss; + ss << std::endl; + ss << "\t" << "uri.str() = \"" << str() << "\"" << std::endl; + ss << "\t" << "uri.get_scheme() = \"" << get_scheme() << "\"" << std::endl; + ss << "\t" << "uri.get_host() = \"" << get_host() << "\"" << std::endl; + + if(_port == -1) + ss << "\t" << "uri.get_port() = invalid (uninitialized)" << std::endl; + else + ss << "\t" << "uri.get_port() = \"" << _port << "\"" << std::endl; + + ss << "\t" << "uri.get_path() = \"" << get_path() << "\"" << std::endl; + ss << "\t" << "uri.get_fragment() = \"" << get_fragment() << "\"" << std::endl; + + + std::vector<Query> query_elems = get_query_elements(); + + if(query_elems.size() > 0) + ss << "\t" << "Query elements:" << std::endl; + + for(auto qry = query_elems.begin(); qry != query_elems.end(); qry++) { + ss << "\t\t" << qry->key << " -> " << qry->value << std::endl; + } + + return ss.str(); +} + +} // end namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc new file mode 100644 index 0000000..375f951 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "common/util.h" +#include "common/util_c.h" + +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> + +#include <exception> +#include <sstream> +#include <iostream> +#include <iomanip> +#include <thread> + + +namespace hdfs { + +bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in, + ::google::protobuf::MessageLite *msg) { + uint32_t size = 0; + in->ReadVarint32(&size); + auto limit = in->PushLimit(size); + bool res = msg->ParseFromCodedStream(in); + in->PopLimit(limit); + + return res; +} + + +std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageLite *msg, + bool *err) { + namespace pbio = ::google::protobuf::io; + + std::string buf; + + int size = msg->ByteSize(); + buf.reserve(pbio::CodedOutputStream::VarintSize32(size) + size); + pbio::StringOutputStream ss(&buf); + pbio::CodedOutputStream os(&ss); + os.WriteVarint32(size); + + if(err) + *err = msg->SerializeToCodedStream(&os); + + return buf; +} + + +std::string GetRandomClientName() { + std::vector<unsigned char>buf(8); + RAND_pseudo_bytes(&buf[0], 8); + + std::ostringstream oss; + oss << "DFSClient_" << getpid() << "_" << + std::this_thread::get_id() << "_" << + std::setw(2) << std::hex << std::uppercase << std::setfill('0'); + for (unsigned char b: buf) + oss << static_cast<unsigned>(b); + + return oss.str(); +} + +std::string Base64Encode(const std::string &src) { + //encoded size is (sizeof(buf) + 2) / 3 * 4 + static const std::string base64_chars = + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789+/"; + std::string ret; + int i = 0; + int j = 0; + unsigned char char_array_3[3]; + unsigned char char_array_4[4]; + unsigned const char *bytes_to_encode = reinterpret_cast<unsigned const char *>(&src[i]); + unsigned int in_len = src.size(); + + while (in_len--) { + char_array_3[i++] = *(bytes_to_encode++); + if (i == 3) { + char_array_4[0] = (char_array_3[0] & 0xfc) >> 2; + char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4); + char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6); + char_array_4[3] = char_array_3[2] & 0x3f; + + for(i = 0; (i <4) ; i++) + ret += base64_chars[char_array_4[i]]; + i = 0; + } + } + + if (i) { + for(j = i; j < 3; j++) + char_array_3[j] = '\0'; + + char_array_4[0] = (char_array_3[0] & 0xfc) >> 2; + char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4); + char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6); + char_array_4[3] = char_array_3[2] & 0x3f; + + for (j = 0; (j < i + 1); j++) + ret += base64_chars[char_array_4[j]]; + + while((i++ < 3)) + ret += '='; + } + return ret; +} + + +std::string SafeDisconnect(asio::ip::tcp::socket *sock) { + std::string err; + if(sock && sock->is_open()) { + /** + * Even though we just checked that the socket is open it's possible + * it isn't in a state where it can properly send or receive. If that's + * the case asio will turn the underlying error codes from shutdown() + * and close() into unhelpfully named std::exceptions. Due to the + * relatively innocuous nature of most of these error codes it's better + * to just catch and return a flag so the caller can log failure. + **/ + + try { + sock->shutdown(asio::ip::tcp::socket::shutdown_both); + } catch (const std::exception &e) { + err = std::string("shutdown() threw") + e.what(); + } + + try { + sock->close(); + } catch (const std::exception &e) { + // don't append if shutdown() already failed, first failure is the useful one + if(err.empty()) + err = std::string("close() threw") + e.what(); + } + + } + return err; +} + +bool IsHighBitSet(uint64_t num) { + uint64_t firstBit = (uint64_t) 1 << 63; + if (num & firstBit) { + return true; + } else { + return false; + } +} + +} + +void ShutdownProtobufLibrary_C() { + google::protobuf::ShutdownProtobufLibrary(); +} + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h new file mode 100644 index 0000000..be902bd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_UTIL_H_ +#define LIB_COMMON_UTIL_H_ + +#include "hdfspp/status.h" +#include "common/logging.h" + +#include <sstream> +#include <mutex> +#include <string> + +#include <asio/error_code.hpp> +#include <openssl/rand.h> + +#include <google/protobuf/message_lite.h> +#include <google/protobuf/io/coded_stream.h> +#include <asio.hpp> + +namespace hdfs { + +// typedefs based on code that's repeated everywhere +typedef std::lock_guard<std::mutex> mutex_guard; + + +static inline Status ToStatus(const ::asio::error_code &ec) { + if (ec) { + return Status(ec.value(), ec.message().c_str()); + } else { + return Status::OK(); + } +} + +// Determine size of buffer that needs to be allocated in order to serialize msg +// in delimited format +static inline int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg) { + size_t size = msg->ByteSize(); + return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size; +} + +// Construct msg from the input held in the CodedInputStream +// return false on failure, otherwise return true +bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in, + ::google::protobuf::MessageLite *msg); + +// Serialize msg into a delimited form (java protobuf compatible) +// err, if not null, will be set to false on failure +std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageLite *msg, + bool *err); + +std::string Base64Encode(const std::string &src); + +// Return a new high-entropy client name +std::string GetRandomClientName(); + +// Returns true if _someone_ is holding the lock (not necessarily this thread, +// but a std::mutex doesn't track which thread is holding the lock) +template<class T> +bool lock_held(T & mutex) { + bool result = !mutex.try_lock(); + if (!result) + mutex.unlock(); + return result; +} + +// Shutdown and close a socket safely; will check if the socket is open and +// catch anything thrown by asio. +// Returns a string containing error message on failure, otherwise an empty string. +std::string SafeDisconnect(asio::ip::tcp::socket *sock); + + + +// The following helper function is used for classes that look like the following: +// +// template <typename socket_like_object> +// class ObjectThatHoldsSocket { +// socket_like_object sock_; +// void DoSomethingWithAsioTcpSocket(); +// } +// +// The trick here is that ObjectThatHoldsSocket may be templated on a mock socket +// in mock tests. If you have a method that explicitly needs to call some asio +// method unrelated to the mock test you need a way of making sure socket_like_object +// is, in fact, an asio::ip::tcp::socket. Otherwise the mocks need to implement +// lots of no-op boilerplate. This will return the value of the input param if +// it's a asio socket, and nullptr if it's anything else. + +template <typename sock_t> +inline asio::ip::tcp::socket *get_asio_socket_ptr(sock_t *s) { + (void)s; + return nullptr; +} +template<> +inline asio::ip::tcp::socket *get_asio_socket_ptr<asio::ip::tcp::socket> + (asio::ip::tcp::socket *s) { + return s; +} + +//Check if the high bit is set +bool IsHighBitSet(uint64_t num); + + +// Provide a way to do an atomic swap on a callback. +// SetCallback, AtomicSwapCallback, and GetCallback can only be called once each. +// AtomicSwapCallback and GetCallback must only be called after SetCallback. +// +// We can't throw on error, and since the callback is templated it's tricky to +// generate generic dummy callbacks. Complain loudly in the log and get good +// test coverage. It shouldn't be too hard to avoid invalid states. +template <typename CallbackType> +class SwappableCallbackHolder { + private: + std::mutex state_lock_; + CallbackType callback_; + bool callback_set_ = false; + bool callback_swapped_ = false; + bool callback_accessed_ = false; + public: + bool IsCallbackSet() { + mutex_guard swap_lock(state_lock_); + return callback_set_; + } + + bool IsCallbackAccessed() { + mutex_guard swap_lock(state_lock_); + return callback_accessed_; + } + + bool SetCallback(const CallbackType& callback) { + mutex_guard swap_lock(state_lock_); + if(callback_set_ || callback_swapped_ || callback_accessed_) { + LOG_ERROR(kAsyncRuntime, << "SetCallback violates access invariants.") + return false; + } + callback_ = callback; + callback_set_ = true; + return true; + } + + CallbackType AtomicSwapCallback(const CallbackType& replacement, bool& swapped) { + mutex_guard swap_lock(state_lock_); + if(!callback_set_ || callback_swapped_) { + LOG_ERROR(kAsyncRuntime, << "AtomicSwapCallback violates access invariants.") + swapped = false; + } else if (callback_accessed_) { + // Common case where callback has been invoked but caller may not know + LOG_DEBUG(kAsyncRuntime, << "AtomicSwapCallback called after callback has been accessed"); + return callback_; + } + + CallbackType old = callback_; + callback_ = replacement; + callback_swapped_ = true; + swapped = true; + return old; + } + CallbackType GetCallback() { + mutex_guard swap_lock(state_lock_); + if(!callback_set_ || callback_accessed_) { + LOG_ERROR(kAsyncRuntime, << "GetCallback violates access invariants.") + } + callback_accessed_ = true; + return callback_; + } +}; + + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util_c.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util_c.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util_c.h new file mode 100644 index 0000000..c7db7d2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util_c.h @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_UTIL_C_H_ +#define LIB_COMMON_UTIL_C_H_ + +#ifdef __cplusplus +extern "C" { +#endif + + void ShutdownProtobufLibrary_C(); + +#ifdef __cplusplus +} /* end extern "C" */ +#endif + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt new file mode 100644 index 0000000..fda51a1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +add_library(connection_obj OBJECT datanodeconnection.cc) +add_dependencies(connection_obj proto) +add_library(connection $<TARGET_OBJECTS:connection_obj>) http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc new file mode 100644 index 0000000..27cd666 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "datanodeconnection.h" +#include "common/util.h" + +namespace hdfs { + +DataNodeConnection::~DataNodeConnection(){} +DataNodeConnectionImpl::~DataNodeConnectionImpl(){} + +DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service, + const ::hadoop::hdfs::DatanodeInfoProto &dn_proto, + const hadoop::common::TokenProto *token, + LibhdfsEvents *event_handlers) : event_handlers_(event_handlers) +{ + using namespace ::asio::ip; + + conn_.reset(new tcp::socket(*io_service)); + auto datanode_addr = dn_proto.id(); + endpoints_[0] = tcp::endpoint(address::from_string(datanode_addr.ipaddr()), + datanode_addr.xferport()); + uuid_ = dn_proto.id().datanodeuuid(); + + if (token) { + token_.reset(new hadoop::common::TokenProto()); + token_->CheckTypeAndMergeFrom(*token); + } +} + + +void DataNodeConnectionImpl::Connect( + std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) { + // Keep the DN from being freed until we're done + mutex_guard state_lock(state_lock_); + auto shared_this = shared_from_this(); + asio::async_connect(*conn_, endpoints_.begin(), endpoints_.end(), + [shared_this, handler](const asio::error_code &ec, std::array<asio::ip::tcp::endpoint, 1>::iterator it) { + (void)it; + handler(ToStatus(ec), shared_this); }); +} + +void DataNodeConnectionImpl::Cancel() { + std::string err; + + { // scope the lock for disconnect only, log has it's own lock + mutex_guard state_lock(state_lock_); + err = SafeDisconnect(conn_.get()); + } + + if(!err.empty()) { + LOG_WARN(kBlockReader, << "Error disconnecting socket in DataNodeConnectionImpl::Cancel, " << err); + } +} + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h new file mode 100644 index 0000000..21193b3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_ +#define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_ + +#include "common/hdfs_ioservice.h" +#include "common/async_stream.h" +#include "ClientNamenodeProtocol.pb.h" +#include "common/libhdfs_events_impl.h" +#include "common/logging.h" +#include "common/util.h" +#include "common/new_delete.h" + +#include "asio.hpp" + +namespace hdfs { + +class DataNodeConnection : public AsyncStream { +public: + MEMCHECKED_CLASS(DataNodeConnection) + std::string uuid_; + std::unique_ptr<hadoop::common::TokenProto> token_; + + virtual ~DataNodeConnection(); + virtual void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) = 0; + virtual void Cancel() = 0; +}; + + +struct SocketDeleter { + inline void operator()(asio::ip::tcp::socket *sock) { + // Cancel may have already closed the socket. + std::string err = SafeDisconnect(sock); + if(!err.empty()) { + LOG_WARN(kBlockReader, << "Error disconnecting socket: " << err); + } + delete sock; + } +}; + +class DataNodeConnectionImpl : public DataNodeConnection, public std::enable_shared_from_this<DataNodeConnectionImpl>{ +private: + // held (briefly) while posting async ops to the asio task queue + std::mutex state_lock_; +public: + std::unique_ptr<asio::ip::tcp::socket, SocketDeleter> conn_; + std::array<asio::ip::tcp::endpoint, 1> endpoints_; + std::string uuid_; + LibhdfsEvents *event_handlers_; + + virtual ~DataNodeConnectionImpl(); + DataNodeConnectionImpl(asio::io_service * io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto, + const hadoop::common::TokenProto *token, + LibhdfsEvents *event_handlers); + + void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override; + + void Cancel() override; + + void async_read_some(const MutableBuffers &buf, + std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) + override { + event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin()); + + + mutex_guard state_lock(state_lock_); + conn_->async_read_some(buf, handler); + }; + + void async_write_some(const ConstBuffers &buf, + std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) + override { + event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin()); + + mutex_guard state_lock(state_lock_); + conn_->async_write_some(buf, handler); + } +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt new file mode 100644 index 0000000..624cda5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +add_library(fs_obj OBJECT filesystem.cc filesystem_sync.cc filehandle.cc bad_datanode_tracker.cc namenode_operations.cc) +add_dependencies(fs_obj proto) +add_library(fs $<TARGET_OBJECTS:fs_obj>) http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.cc new file mode 100644 index 0000000..be9020f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.cc @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "bad_datanode_tracker.h" + +namespace hdfs { + +NodeExclusionRule::~NodeExclusionRule() {} + +BadDataNodeTracker::BadDataNodeTracker(const Options& options) + : timeout_duration_(options.host_exclusion_duration), + test_clock_shift_(0) {} + +BadDataNodeTracker::~BadDataNodeTracker() {} + +void BadDataNodeTracker::AddBadNode(const std::string& dn) { + std::lock_guard<std::mutex> update_lock(datanodes_update_lock_); + datanodes_[dn] = Clock::now(); +} + +bool BadDataNodeTracker::IsBadNode(const std::string& dn) { + std::lock_guard<std::mutex> update_lock(datanodes_update_lock_); + + if (datanodes_.count(dn) == 1) { + const TimePoint& entered_time = datanodes_[dn]; + if (TimeoutExpired(entered_time)) { + datanodes_.erase(dn); + return false; + } + /* node in set and still marked bad */ + return true; + } + return false; +} + +void BadDataNodeTracker::TEST_set_clock_shift(int t) { test_clock_shift_ = t; } + +bool BadDataNodeTracker::TimeoutExpired(const TimePoint& t) { + TimePoint threshold = Clock::now() - + std::chrono::milliseconds(timeout_duration_) + + std::chrono::milliseconds(test_clock_shift_); + if (t < threshold) { + return true; + } + return false; +} + +ExclusionSet::ExclusionSet(const std::set<std::string>& excluded) + : excluded_(excluded) {} + +ExclusionSet::~ExclusionSet() {} + +bool ExclusionSet::IsBadNode(const std::string& node_uuid) { + return excluded_.count(node_uuid) == 1; +} +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org