HDFS-11106: libhdfs++: Some refactoring to better organize files (part 2). Contributed by James Clampffer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d75104ea Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d75104ea Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d75104ea Branch: refs/heads/HDFS-12996 Commit: d75104eacf5ba01e13a5e2a0c3e9dd633fcfad8c Parents: da208bb Author: James <j...@apache.org> Authored: Mon Mar 6 12:30:19 2017 -0500 Committer: Hanisha Koneru <hanishakon...@apache.org> Committed: Mon Mar 26 11:11:03 2018 -0700 ---------------------------------------------------------------------- .../native/libhdfspp/lib/rpc/CMakeLists.txt | 2 +- .../libhdfspp/lib/rpc/namenode_tracker.cc | 134 +++++ .../native/libhdfspp/lib/rpc/namenode_tracker.h | 81 +++ .../main/native/libhdfspp/lib/rpc/request.cc | 190 +++++++ .../src/main/native/libhdfspp/lib/rpc/request.h | 87 +++ .../native/libhdfspp/lib/rpc/rpc_connection.cc | 563 ------------------- .../native/libhdfspp/lib/rpc/rpc_connection.h | 552 +++++------------- .../libhdfspp/lib/rpc/rpc_connection_impl.cc | 446 +++++++++++++++ .../libhdfspp/lib/rpc/rpc_connection_impl.h | 445 +++++++++++++++ .../main/native/libhdfspp/lib/rpc/rpc_engine.cc | 125 +--- .../main/native/libhdfspp/lib/rpc/rpc_engine.h | 250 +------- .../native/libhdfspp/lib/rpc/sasl_protocol.cc | 1 + .../native/libhdfspp/tests/rpc_engine_test.cc | 2 +- 13 files changed, 1535 insertions(+), 1343 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt index 84debdd..e5a26fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt @@ -16,7 +16,7 @@ # limitations under the License. # -list(APPEND rpc_object_items rpc_connection.cc rpc_engine.cc sasl_protocol.cc sasl_engine.cc) +list(APPEND rpc_object_items rpc_connection_impl.cc rpc_engine.cc namenode_tracker.cc request.cc sasl_protocol.cc sasl_engine.cc) if (CMAKE_USING_CYRUS_SASL) list(APPEND rpc_object_items cyrus_sasl_engine.cc) endif (CMAKE_USING_CYRUS_SASL) http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc new file mode 100644 index 0000000..9d9a816 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "namenode_tracker.h" + +#include "common/logging.h" +#include "common/libhdfs_events_impl.h" +#include "common/util.h" + +namespace hdfs { + +static std::string format_endpoints(const std::vector<::asio::ip::tcp::endpoint> &pts) { + std::stringstream ss; + for(unsigned int i=0; i<pts.size(); i++) + if(i == pts.size() - 1) + ss << pts[i]; + else + ss << pts[i] << ", "; + return ss.str(); +} + +HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers, + ::asio::io_service *ioservice, + std::shared_ptr<LibhdfsEvents> event_handlers) + : enabled_(false), resolved_(false), + ioservice_(ioservice), event_handlers_(event_handlers) +{ + LOG_TRACE(kRPC, << "HANamenodeTracker got the following nodes"); + for(unsigned int i=0;i<servers.size();i++) + LOG_TRACE(kRPC, << servers[i].str()); + + if(servers.size() >= 2) { + LOG_TRACE(kRPC, << "Creating HA namenode tracker"); + if(servers.size() > 2) { + LOG_WARN(kRPC, << "Nameservice declares more than two nodes. Some won't be used."); + } + + active_info_ = servers[0]; + standby_info_ = servers[1]; + LOG_INFO(kRPC, << "Active namenode url = " << active_info_.uri.str()); + LOG_INFO(kRPC, << "Standby namenode url = " << standby_info_.uri.str()); + + enabled_ = true; + if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) { + resolved_ = true; + } + } +} + +HANamenodeTracker::~HANamenodeTracker() {} + +// Pass in endpoint from current connection, this will do a reverse lookup +// and return the info for the standby node. It will also swap its state internally. +ResolvedNamenodeInfo HANamenodeTracker::GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint) { + LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoint); + mutex_guard swap_lock(swap_lock_); + + ResolvedNamenodeInfo failover_node; + + // Connected to standby, switch standby to active + if(IsCurrentActive_locked(current_endpoint)) { + std::swap(active_info_, standby_info_); + if(event_handlers_) + event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(), + reinterpret_cast<int64_t>(active_info_.uri.str().c_str())); + failover_node = active_info_; + } else if(IsCurrentStandby_locked(current_endpoint)) { + // Connected to standby + if(event_handlers_) + event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(), + reinterpret_cast<int64_t>(active_info_.uri.str().c_str())); + failover_node = active_info_; + } else { + // Invalid state, throw for testing + std::string ep1 = format_endpoints(active_info_.endpoints); + std::string ep2 = format_endpoints(standby_info_.endpoints); + + std::stringstream msg; + msg << "Looked for " << current_endpoint << " in\n"; + msg << ep1 << " and\n"; + msg << ep2 << std::endl; + + LOG_ERROR(kRPC, << "Unable to find RPC connection in config " << msg.str() << ". Bailing out."); + throw std::runtime_error(msg.str()); + } + + if(failover_node.endpoints.empty()) { + LOG_WARN(kRPC, << "No endpoints for node " << failover_node.uri.str() << " attempting to resolve again"); + if(!ResolveInPlace(ioservice_, failover_node)) { + LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << failover_node.uri.str() + << "failed. Please make sure your configuration is up to date."); + } + } + return failover_node; +} + +bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const { + for(unsigned int i=0;i<active_info_.endpoints.size();i++) { + if(ep.address() == active_info_.endpoints[i].address()) { + if(ep.port() != active_info_.endpoints[i].port()) + LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << active_info_.endpoints[i] << " trying anyway.."); + return true; + } + } + return false; +} + +bool HANamenodeTracker::IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const { + for(unsigned int i=0;i<standby_info_.endpoints.size();i++) { + if(ep.address() == standby_info_.endpoints[i].address()) { + if(ep.port() != standby_info_.endpoints[i].port()) + LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << standby_info_.endpoints[i] << " trying anyway.."); + return true; + } + } + return false; +} + +} // end namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h new file mode 100644 index 0000000..f51e13c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIB_RPC_NAMENODE_TRACKER_H +#define LIB_RPC_NAMENODE_TRACKER_H + +#include "common/libhdfs_events_impl.h" +#include "common/namenode_info.h" + +#include <asio/ip/tcp.hpp> + +#include <memory> +#include <mutex> +#include <vector> + +namespace hdfs { + +/* + * Tracker gives the RpcEngine a quick way to use an endpoint that just + * failed in order to lookup a set of endpoints for a failover node. + * + * Note: For now this only deals with 2 NameNodes, but that's the default + * anyway. + */ +class HANamenodeTracker { + public: + HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers, + ::asio::io_service *ioservice, + std::shared_ptr<LibhdfsEvents> event_handlers_); + + virtual ~HANamenodeTracker(); + + bool is_enabled() const { return enabled_; } + bool is_resolved() const { return resolved_; } + + // Get node opposite of the current one if possible (swaps active/standby) + // Note: This will always mutate internal state. Use IsCurrentActive/Standby to + // get info without changing state + ResolvedNamenodeInfo GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint); + + bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const; + bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const; + + private: + // If HA should be enabled, according to our options and runtime info like # nodes provided + bool enabled_; + // If we were able to resolve at least 1 HA namenode + bool resolved_; + + // Keep service in case a second round of DNS lookup is required + ::asio::io_service *ioservice_; + + // Event handlers, for now this is the simplest place to catch all failover events + // and push info out to client application. Possibly move into RPCEngine. + std::shared_ptr<LibhdfsEvents> event_handlers_; + + // Only support 1 active and 1 standby for now. + ResolvedNamenodeInfo active_info_; + ResolvedNamenodeInfo standby_info_; + + // Aquire when switching from active-standby + std::mutex swap_lock_; +}; + +} // end namespace hdfs +#endif // end include guard http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc new file mode 100644 index 0000000..119962a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "request.h" +#include "rpc_engine.h" +#include "sasl_protocol.h" + +#include "RpcHeader.pb.h" +#include "ProtobufRpcEngine.pb.h" +#include "IpcConnectionContext.pb.h" + +#include <sstream> + +namespace hdfs { + +namespace pb = ::google::protobuf; +namespace pbio = ::google::protobuf::io; + +using namespace ::hadoop::common; +using namespace ::std::placeholders; + +static const int kNoRetry = -1; + +// Protobuf helper functions. +static void AddHeadersToPacket(std::string *res, + std::initializer_list<const pb::MessageLite *> headers, + const std::string *payload) { + int len = 0; + std::for_each( + headers.begin(), headers.end(), + [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); }); + + if (payload) { + len += payload->size(); + } + + int net_len = htonl(len); + res->reserve(res->size() + sizeof(net_len) + len); + + pbio::StringOutputStream ss(res); + pbio::CodedOutputStream os(&ss); + os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len)); + + uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); + assert(buf); + + std::for_each( + headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) { + buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf); + buf = v->SerializeWithCachedSizesToArray(buf); + }); + + if (payload) { + buf = os.WriteStringToArray(*payload, buf); + } +} + +static void ConstructPayload(std::string *res, const pb::MessageLite *header) { + int len = DelimitedPBMessageSize(header); + res->reserve(len); + pbio::StringOutputStream ss(res); + pbio::CodedOutputStream os(&ss); + uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); + assert(buf); + buf = pbio::CodedOutputStream::WriteVarint32ToArray(header->ByteSize(), buf); + buf = header->SerializeWithCachedSizesToArray(buf); +} + +static void ConstructPayload(std::string *res, const std::string *request) { + int len = + pbio::CodedOutputStream::VarintSize32(request->size()) + request->size(); + res->reserve(len); + pbio::StringOutputStream ss(res); + pbio::CodedOutputStream os(&ss); + uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); + assert(buf); + buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf); + buf = os.WriteStringToArray(*request, buf); +} + +static void SetRequestHeader(LockFreeRpcEngine *engine, int call_id, + const std::string &method_name, int retry_count, + RpcRequestHeaderProto *rpc_header, + RequestHeaderProto *req_header) { + rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER); + rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); + rpc_header->set_callid(call_id); + if (retry_count != kNoRetry) + rpc_header->set_retrycount(retry_count); + rpc_header->set_clientid(engine->client_id()); + + req_header->set_methodname(method_name); + req_header->set_declaringclassprotocolname(engine->protocol_name()); + req_header->set_clientprotocolversion(engine->protocol_version()); +} + +// Request implementation + +Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id, + const std::string &request, Handler &&handler) + : engine_(engine), + method_name_(method_name), + call_id_(call_id), + timer_(engine->io_service()), + handler_(std::move(handler)), + retry_count_(engine->retry_policy() ? 0 : kNoRetry), + failover_count_(0) { + ConstructPayload(&payload_, &request); +} + + +Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id, + const pb::MessageLite *request, Handler &&handler) + : engine_(engine), + method_name_(method_name), + call_id_(call_id), + timer_(engine->io_service()), + handler_(std::move(handler)), + retry_count_(engine->retry_policy() ? 0 : kNoRetry), + failover_count_(0) { + ConstructPayload(&payload_, request); +} + +Request::Request(LockFreeRpcEngine *engine, Handler &&handler) + : engine_(engine), + call_id_(-1), + timer_(engine->io_service()), + handler_(std::move(handler)), + retry_count_(engine->retry_policy() ? 0 : kNoRetry), + failover_count_(0) { +} + +void Request::GetPacket(std::string *res) const { + LOG_TRACE(kRPC, << "Request::GetPacket called"); + + if (payload_.empty()) + return; + + RpcRequestHeaderProto rpc_header; + RequestHeaderProto req_header; + SetRequestHeader(engine_, call_id_, method_name_, retry_count_, &rpc_header, + &req_header); + + // SASL messages don't have a request header + if (method_name_ != SASL_METHOD_NAME) + AddHeadersToPacket(res, {&rpc_header, &req_header}, &payload_); + else + AddHeadersToPacket(res, {&rpc_header}, &payload_); +} + +void Request::OnResponseArrived(pbio::CodedInputStream *is, + const Status &status) { + LOG_TRACE(kRPC, << "Request::OnResponseArrived called"); + handler_(is, status); +} + +std::string Request::GetDebugString() const { + // Basic description of this object, aimed at debugging + std::stringstream ss; + ss << "\nRequest Object:\n"; + ss << "\tMethod name = \"" << method_name_ << "\"\n"; + ss << "\tCall id = " << call_id_ << "\n"; + ss << "\tRetry Count = " << retry_count_ << "\n"; + ss << "\tFailover count = " << failover_count_ << "\n"; + return ss.str(); +} + +int Request::IncrementFailoverCount() { + // reset retry count when failing over + retry_count_ = 0; + return failover_count_++; +} + +} // end namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h new file mode 100644 index 0000000..d265a4c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_RPC_RPC_REQUEST_H +#define LIB_RPC_RPC_REQUEST_H + +#include "hdfspp/status.h" +#include "common/util.h" +#include "common/new_delete.h" + +#include <string> + +#include <google/protobuf/message_lite.h> +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> + +#include <asio/deadline_timer.hpp> + + +namespace hdfs { + +class LockFreeRpcEngine; +class SaslProtocol; + +/* + * Internal bookkeeping for an outstanding request from the consumer. + * + * Threading model: not thread-safe; should only be accessed from a single + * thread at a time + */ +class Request { + public: + MEMCHECKED_CLASS(Request) + typedef std::function<void(::google::protobuf::io::CodedInputStream *is, + const Status &status)> Handler; + + Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id, + const std::string &request, Handler &&callback); + Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id, + const ::google::protobuf::MessageLite *request, Handler &&callback); + + // Null request (with no actual message) used to track the state of an + // initial Connect call + Request(LockFreeRpcEngine *engine, Handler &&handler); + + int call_id() const { return call_id_; } + std::string method_name() const { return method_name_; } + ::asio::deadline_timer &timer() { return timer_; } + int IncrementRetryCount() { return retry_count_++; } + int IncrementFailoverCount(); + void GetPacket(std::string *res) const; + void OnResponseArrived(::google::protobuf::io::CodedInputStream *is, + const Status &status); + + int get_failover_count() {return failover_count_;} + + std::string GetDebugString() const; + + private: + LockFreeRpcEngine *const engine_; + const std::string method_name_; + const int call_id_; + + ::asio::deadline_timer timer_; + std::string payload_; + const Handler handler_; + + int retry_count_; + int failover_count_; +}; + +} // end namespace hdfs +#endif // end include guard http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc deleted file mode 100644 index f629d1f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc +++ /dev/null @@ -1,563 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "rpc_engine.h" -#include "sasl_protocol.h" - -#include "RpcHeader.pb.h" -#include "ProtobufRpcEngine.pb.h" -#include "IpcConnectionContext.pb.h" - -#include "common/logging.h" -#include "common/util.h" - -#include <asio/read.hpp> - -namespace hdfs { - -namespace pb = ::google::protobuf; -namespace pbio = ::google::protobuf::io; - -using namespace ::hadoop::common; -using namespace ::std::placeholders; - -static const int kNoRetry = -1; - -static void AddHeadersToPacket( - std::string *res, std::initializer_list<const pb::MessageLite *> headers, - const std::string *payload) { - int len = 0; - std::for_each( - headers.begin(), headers.end(), - [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); }); - - if (payload) { - len += payload->size(); - } - - int net_len = htonl(len); - res->reserve(res->size() + sizeof(net_len) + len); - - pbio::StringOutputStream ss(res); - pbio::CodedOutputStream os(&ss); - os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len)); - - uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); - assert(buf); - - std::for_each( - headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) { - buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf); - buf = v->SerializeWithCachedSizesToArray(buf); - }); - - if (payload) { - buf = os.WriteStringToArray(*payload, buf); - } -} - -static void ConstructPayload(std::string *res, const pb::MessageLite *header) { - int len = DelimitedPBMessageSize(header); - res->reserve(len); - pbio::StringOutputStream ss(res); - pbio::CodedOutputStream os(&ss); - uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); - assert(buf); - buf = pbio::CodedOutputStream::WriteVarint32ToArray(header->ByteSize(), buf); - buf = header->SerializeWithCachedSizesToArray(buf); -} - -static void ConstructPayload(std::string *res, const std::string *request) { - int len = - pbio::CodedOutputStream::VarintSize32(request->size()) + request->size(); - res->reserve(len); - pbio::StringOutputStream ss(res); - pbio::CodedOutputStream os(&ss); - uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); - assert(buf); - buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf); - buf = os.WriteStringToArray(*request, buf); -} - -static void SetRequestHeader(LockFreeRpcEngine *engine, int call_id, - const std::string &method_name, int retry_count, - RpcRequestHeaderProto *rpc_header, - RequestHeaderProto *req_header) { - rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER); - rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); - rpc_header->set_callid(call_id); - if (retry_count != kNoRetry) - rpc_header->set_retrycount(retry_count); - rpc_header->set_clientid(engine->client_id()); - - req_header->set_methodname(method_name); - req_header->set_declaringclassprotocolname(engine->protocol_name()); - req_header->set_clientprotocolversion(engine->protocol_version()); -} - -RpcConnection::~RpcConnection() {} - -Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id, - const std::string &request, Handler &&handler) - : engine_(engine), - method_name_(method_name), - call_id_(call_id), - timer_(engine->io_service()), - handler_(std::move(handler)), - retry_count_(engine->retry_policy() ? 0 : kNoRetry), - failover_count_(0) { - ConstructPayload(&payload_, &request); -} - -Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id, - const pb::MessageLite *request, Handler &&handler) - : engine_(engine), - method_name_(method_name), - call_id_(call_id), - timer_(engine->io_service()), - handler_(std::move(handler)), - retry_count_(engine->retry_policy() ? 0 : kNoRetry), - failover_count_(0) { - ConstructPayload(&payload_, request); -} - -Request::Request(LockFreeRpcEngine *engine, Handler &&handler) - : engine_(engine), - call_id_(-1), - timer_(engine->io_service()), - handler_(std::move(handler)), - retry_count_(engine->retry_policy() ? 0 : kNoRetry), - failover_count_(0) { -} - -void Request::GetPacket(std::string *res) const { - LOG_TRACE(kRPC, << "Request::GetPacket called"); - - if (payload_.empty()) - return; - - RpcRequestHeaderProto rpc_header; - RequestHeaderProto req_header; - SetRequestHeader(engine_, call_id_, method_name_, retry_count_, &rpc_header, - &req_header); - - // SASL messages don't have a request header - if (method_name_ != SASL_METHOD_NAME) - AddHeadersToPacket(res, {&rpc_header, &req_header}, &payload_); - else - AddHeadersToPacket(res, {&rpc_header}, &payload_); -} - -void Request::OnResponseArrived(pbio::CodedInputStream *is, - const Status &status) { - LOG_TRACE(kRPC, << "Request::OnResponseArrived called"); - handler_(is, status); -} - -std::string Request::GetDebugString() const { - // Basic description of this object, aimed at debugging - std::stringstream ss; - ss << "\nRequest Object:\n"; - ss << "\tMethod name = \"" << method_name_ << "\"\n"; - ss << "\tCall id = " << call_id_ << "\n"; - ss << "\tRetry Count = " << retry_count_ << "\n"; - ss << "\tFailover count = " << failover_count_ << "\n"; - return ss.str(); -} - -int Request::IncrementFailoverCount() { - // reset retry count when failing over - retry_count_ = 0; - return failover_count_++; -} - -RpcConnection::RpcConnection(LockFreeRpcEngine *engine) - : engine_(engine), - connected_(kNotYetConnected) {} - -::asio::io_service &RpcConnection::io_service() { - return engine_->io_service(); -} - -void RpcConnection::StartReading() { - auto shared_this = shared_from_this(); - io_service().post([shared_this, this] () { - OnRecvCompleted(::asio::error_code(), 0); - }); -} - -void RpcConnection::HandshakeComplete(const Status &s) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - - LOG_TRACE(kRPC, << "RpcConnectionImpl::HandshakeComplete called"); - - if (s.ok()) { - if (connected_ == kHandshaking) { - auto shared_this = shared_from_this(); - - connected_ = kAuthenticating; - if (auth_info_.useSASL()) { -#ifdef USE_SASL - sasl_protocol_ = std::make_shared<SaslProtocol>(cluster_name_, auth_info_, shared_from_this()); - sasl_protocol_->SetEventHandlers(event_handlers_); - sasl_protocol_->Authenticate([shared_this, this]( - const Status & status, const AuthInfo & new_auth_info) { - AuthComplete(status, new_auth_info); } ); -#else - AuthComplete_locked(Status::Error("SASL is required, but no SASL library was found"), auth_info_); -#endif - } else { - AuthComplete_locked(Status::OK(), auth_info_); - } - } - } else { - CommsError(s); - }; -} - -void RpcConnection::AuthComplete(const Status &s, const AuthInfo & new_auth_info) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - AuthComplete_locked(s, new_auth_info); -} - -void RpcConnection::AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info) { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - LOG_TRACE(kRPC, << "RpcConnectionImpl::AuthComplete called"); - - // Free the sasl_protocol object - sasl_protocol_.reset(); - - if (s.ok()) { - auth_info_ = new_auth_info; - - auto shared_this = shared_from_this(); - SendContext([shared_this, this](const Status & s) { - ContextComplete(s); - }); - } else { - CommsError(s); - }; -} - -void RpcConnection::ContextComplete(const Status &s) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - - LOG_TRACE(kRPC, << "RpcConnectionImpl::ContextComplete called"); - - if (s.ok()) { - if (connected_ == kAuthenticating) { - connected_ = kConnected; - } - FlushPendingRequests(); - } else { - CommsError(s); - }; -} - -void RpcConnection::AsyncFlushPendingRequests() { - std::shared_ptr<RpcConnection> shared_this = shared_from_this(); - io_service().post([shared_this, this]() { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - - LOG_TRACE(kRPC, << "RpcConnection::AsyncFlushPendingRequests called (connected=" << ToString(connected_) << ")"); - - if (!request_over_the_wire_) { - FlushPendingRequests(); - } - }); -} - -Status RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - response->ar.reset(new pbio::ArrayInputStream(&response->data_[0], response->data_.size())); - response->in.reset(new pbio::CodedInputStream(response->ar.get())); - response->in->PushLimit(response->data_.size()); - RpcResponseHeaderProto h; - ReadDelimitedPBMessage(response->in.get(), &h); - - auto req = RemoveFromRunningQueue(h.callid()); - if (!req) { - LOG_WARN(kRPC, << "RPC response with Unknown call id " << h.callid()); - if((int32_t)h.callid() == RpcEngine::kCallIdSasl) { - return Status::AuthenticationFailed("You have an unsecured client connecting to a secured server"); - } else { - return Status::Error("Rpc response with unknown call id"); - } - } - - Status status; - if(event_handlers_) { - event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); -#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (event_resp.response() == event_response::kTest_Error) { - status = event_resp.status(); - } -#endif - } - - if (status.ok() && h.has_exceptionclassname()) { - status = - Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); - } - - if(status.get_server_exception_type() == Status::kStandbyException) { - LOG_WARN(kRPC, << "Tried to connect to standby. status = " << status.ToString()); - - // We got the request back, but it needs to be resent to the other NN - std::vector<std::shared_ptr<Request>> reqs_to_redirect = {req}; - PrependRequests_locked(reqs_to_redirect); - - CommsError(status); - return status; - } - - io_service().post([req, response, status]() { - req->OnResponseArrived(response->in.get(), status); // Never call back while holding a lock - }); - - return Status::OK(); -} - -void RpcConnection::HandleRpcTimeout(std::shared_ptr<Request> req, - const ::asio::error_code &ec) { - if (ec.value() == asio::error::operation_aborted) { - return; - } - - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - auto r = RemoveFromRunningQueue(req->call_id()); - if (!r) { - // The RPC might have been finished and removed from the queue - return; - } - - Status stat = ToStatus(ec ? ec : make_error_code(::asio::error::timed_out)); - - r->OnResponseArrived(nullptr, stat); -} - -std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - /** From Client.java: - * - * Write the connection header - this is sent when connection is established - * +----------------------------------+ - * | "hrpc" 4 bytes | - * +----------------------------------+ - * | Version (1 byte) | - * +----------------------------------+ - * | Service Class (1 byte) | - * +----------------------------------+ - * | AuthProtocol (1 byte) | - * +----------------------------------+ - * - * AuthProtocol: 0->none, -33->SASL - */ - - char auth_protocol = auth_info_.useSASL() ? -33 : 0; - const char handshake_header[] = {'h', 'r', 'p', 'c', - RpcEngine::kRpcVersion, 0, auth_protocol}; - auto res = - std::make_shared<std::string>(handshake_header, sizeof(handshake_header)); - - return res; -} - -std::shared_ptr<std::string> RpcConnection::PrepareContextPacket() { - // This needs to be send after the SASL handshake, and - // after the SASL handshake (if any) - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - auto res = std::make_shared<std::string>(); - - RpcRequestHeaderProto h; - h.set_rpckind(RPC_PROTOCOL_BUFFER); - h.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); - h.set_callid(RpcEngine::kCallIdConnectionContext); - h.set_clientid(engine_->client_name()); - - IpcConnectionContextProto handshake; - handshake.set_protocol(engine_->protocol_name()); - const std::string & user_name = auth_info_.getUser(); - if (!user_name.empty()) { - *handshake.mutable_userinfo()->mutable_effectiveuser() = user_name; - } - AddHeadersToPacket(res.get(), {&h, &handshake}, nullptr); - - return res; -} - -void RpcConnection::AsyncRpc( - const std::string &method_name, const ::google::protobuf::MessageLite *req, - std::shared_ptr<::google::protobuf::MessageLite> resp, - const RpcCallback &handler) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - AsyncRpc_locked(method_name, req, resp, handler); -} - -void RpcConnection::AsyncRpc_locked( - const std::string &method_name, const ::google::protobuf::MessageLite *req, - std::shared_ptr<::google::protobuf::MessageLite> resp, - const RpcCallback &handler) { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - auto wrapped_handler = - [resp, handler](pbio::CodedInputStream *is, const Status &status) { - if (status.ok()) { - if (is) { // Connect messages will not have an is - ReadDelimitedPBMessage(is, resp.get()); - } - } - handler(status); - }; - - int call_id = (method_name != SASL_METHOD_NAME ? engine_->NextCallId() : RpcEngine::kCallIdSasl); - auto r = std::make_shared<Request>(engine_, method_name, call_id, req, - std::move(wrapped_handler)); - auto r_vector = std::vector<std::shared_ptr<Request> > (1, r); - SendRpcRequests(r_vector); -} - -void RpcConnection::AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - SendRpcRequests(requests); -} - -void RpcConnection::SendRpcRequests(const std::vector<std::shared_ptr<Request> > & requests) { - LOG_TRACE(kRPC, << "RpcConnection::SendRpcRequests[] called; connected=" << ToString(connected_)); - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - if (connected_ == kDisconnected) { - // Oops. The connection failed _just_ before the engine got a chance - // to send it. Register it as a failure - Status status = Status::ResourceUnavailable("RpcConnection closed before send."); - engine_->AsyncRpcCommsError(status, shared_from_this(), requests); - } else { - for (auto r: requests) { - if (r->method_name() != SASL_METHOD_NAME) - pending_requests_.push_back(r); - else - auth_requests_.push_back(r); - } - if (connected_ == kConnected || connected_ == kHandshaking || connected_ == kAuthenticating) { // Dont flush if we're waiting or handshaking - FlushPendingRequests(); - } - } -} - - -void RpcConnection::PreEnqueueRequests( - std::vector<std::shared_ptr<Request>> requests) { - // Public method - acquire lock - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - - LOG_DEBUG(kRPC, << "RpcConnection::PreEnqueueRequests called"); - - assert(connected_ == kNotYetConnected); - - pending_requests_.insert(pending_requests_.end(), requests.begin(), - requests.end()); - // Don't start sending yet; will flush when connected -} - -// Only call when already holding conn state lock -void RpcConnection::PrependRequests_locked( std::vector<std::shared_ptr<Request>> requests) { - LOG_DEBUG(kRPC, << "RpcConnection::PrependRequests called"); - - pending_requests_.insert(pending_requests_.begin(), requests.begin(), - requests.end()); - // Don't start sending yet; will flush when connected -} - -void RpcConnection::SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - event_handlers_ = event_handlers; - if (sasl_protocol_) { - sasl_protocol_->SetEventHandlers(event_handlers); - } -} - -void RpcConnection::SetClusterName(std::string cluster_name) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - cluster_name_ = cluster_name; -} - -void RpcConnection::CommsError(const Status &status) { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - LOG_DEBUG(kRPC, << "RpcConnection::CommsError called"); - - Disconnect(); - - // Anything that has been queued to the connection (on the fly or pending) - // will get dinged for a retry - std::vector<std::shared_ptr<Request>> requestsToReturn; - std::transform(requests_on_fly_.begin(), requests_on_fly_.end(), - std::back_inserter(requestsToReturn), - std::bind(&RequestOnFlyMap::value_type::second, _1)); - requests_on_fly_.clear(); - - requestsToReturn.insert(requestsToReturn.end(), - std::make_move_iterator(pending_requests_.begin()), - std::make_move_iterator(pending_requests_.end())); - pending_requests_.clear(); - - engine_->AsyncRpcCommsError(status, shared_from_this(), requestsToReturn); -} - -void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) { - Disconnect(); - std::vector<std::shared_ptr<Request>> requests; - std::transform(requests_on_fly_.begin(), requests_on_fly_.end(), - std::back_inserter(requests), - std::bind(&RequestOnFlyMap::value_type::second, _1)); - requests_on_fly_.clear(); - requests.insert(requests.end(), - std::make_move_iterator(pending_requests_.begin()), - std::make_move_iterator(pending_requests_.end())); - pending_requests_.clear(); - for (const auto &req : requests) { - req->OnResponseArrived(nullptr, ToStatus(ec)); - } -} - -std::shared_ptr<Request> RpcConnection::RemoveFromRunningQueue(int call_id) { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - auto it = requests_on_fly_.find(call_id); - if (it == requests_on_fly_.end()) { - return std::shared_ptr<Request>(); - } - - auto req = it->second; - requests_on_fly_.erase(it); - return req; -} - -std::string RpcConnection::ToString(ConnectedState connected) { - switch(connected) { - case kNotYetConnected: return "NotYetConnected"; - case kConnecting: return "Connecting"; - case kHandshaking: return "Handshaking"; - case kAuthenticating: return "Authenticating"; - case kConnected: return "Connected"; - case kDisconnected: return "Disconnected"; - default: return "Invalid ConnectedState"; - } -} - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index a6a07c4..7a671fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -15,430 +15,166 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIB_RPC_RPC_CONNECTION_H_ -#define LIB_RPC_RPC_CONNECTION_H_ +#ifndef LIB_RPC_RPC_CONNECTION_H +#define LIB_RPC_RPC_CONNECTION_H -#include "rpc_engine.h" +/* + * Encapsulates a persistent connection to the NameNode, and the sending of + * RPC requests and evaluating their responses. + * + * Can have multiple RPC requests in-flight simultaneously, but they are + * evaluated in-order on the server side in a blocking manner. + * + * Threading model: public interface is thread-safe + * All handlers passed in to method calls will be called from an asio thread, + * and will not be holding any internal RpcConnection locks. + */ +#include "request.h" #include "common/auth_info.h" -#include "common/logging.h" -#include "common/util.h" #include "common/libhdfs_events_impl.h" -#include "sasl_protocol.h" - -#include <asio/connect.hpp> -#include <asio/read.hpp> -#include <asio/write.hpp> +#include "common/new_delete.h" +#include "hdfspp/status.h" -#include <system_error> +#include <functional> +#include <memory> +#include <vector> +#include <deque> +#include <unordered_map> namespace hdfs { -template <class Socket> -class RpcConnectionImpl : public RpcConnection { -public: - MEMCHECKED_CLASS(RpcConnectionImpl); +typedef const std::function<void(const Status &)> RpcCallback; - RpcConnectionImpl(RpcEngine *engine); - virtual ~RpcConnectionImpl() override; +class LockFreeRpcEngine; +class SaslProtocol; +class RpcConnection : public std::enable_shared_from_this<RpcConnection> { + public: + MEMCHECKED_CLASS(RpcConnection) + RpcConnection(LockFreeRpcEngine *engine); + virtual ~RpcConnection(); + + // Note that a single server can have multiple endpoints - especially both + // an ipv4 and ipv6 endpoint virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, const AuthInfo & auth_info, - RpcCallback &handler); - virtual void ConnectAndFlush( - const std::vector<::asio::ip::tcp::endpoint> &server) override; - virtual void SendHandshake(RpcCallback &handler) override; - virtual void SendContext(RpcCallback &handler) override; - virtual void Disconnect() override; - virtual void OnSendCompleted(const ::asio::error_code &ec, - size_t transferred) override; - virtual void OnRecvCompleted(const ::asio::error_code &ec, - size_t transferred) override; - virtual void FlushPendingRequests() override; + RpcCallback &handler) = 0; + virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> &server) = 0; + virtual void Disconnect() = 0; + + void StartReading(); + void AsyncRpc(const std::string &method_name, + const ::google::protobuf::MessageLite *req, + std::shared_ptr<::google::protobuf::MessageLite> resp, + const RpcCallback &handler); + void AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests); - Socket &TEST_get_mutable_socket() { return socket_; } + // Enqueue requests before the connection is connected. Will be flushed + // on connect + void PreEnqueueRequests(std::vector<std::shared_ptr<Request>> requests); - void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; } + // Put requests at the front of the current request queue + void PrependRequests_locked(std::vector<std::shared_ptr<Request>> requests); - private: - const Options options_; - ::asio::ip::tcp::endpoint current_endpoint_; - std::vector<::asio::ip::tcp::endpoint> additional_endpoints_; - Socket socket_; - ::asio::deadline_timer connect_timer_; + void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers); + void SetClusterName(std::string cluster_name); - void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint &remote); + LockFreeRpcEngine *engine() { return engine_; } + ::asio::io_service &io_service(); + + protected: + struct Response { + enum ResponseState { + kReadLength, + kReadContent, + kParseResponse, + } state_; + unsigned length_; + std::vector<char> data_; + + std::unique_ptr<::google::protobuf::io::ArrayInputStream> ar; + std::unique_ptr<::google::protobuf::io::CodedInputStream> in; + + Response() : state_(kReadLength), length_(0) {} + }; + + + // Initial handshaking protocol: connect->handshake-->(auth)?-->context->connected + virtual void SendHandshake(RpcCallback &handler) = 0; + void HandshakeComplete(const Status &s); + void AuthComplete(const Status &s, const AuthInfo & new_auth_info); + void AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info); + virtual void SendContext(RpcCallback &handler) = 0; + void ContextComplete(const Status &s); + + virtual void OnSendCompleted(const ::asio::error_code &ec, + size_t transferred) = 0; + virtual void OnRecvCompleted(const ::asio::error_code &ec, + size_t transferred) = 0; + virtual void FlushPendingRequests()=0; // Synchronously write the next request + + void AsyncRpc_locked( + const std::string &method_name, + const ::google::protobuf::MessageLite *req, + std::shared_ptr<::google::protobuf::MessageLite> resp, + const RpcCallback &handler); + void SendRpcRequests(const std::vector<std::shared_ptr<Request> > & requests); + void AsyncFlushPendingRequests(); // Queue requests to be flushed at a later time + + + + std::shared_ptr<std::string> PrepareHandshakePacket(); + std::shared_ptr<std::string> PrepareContextPacket(); + static std::string SerializeRpcRequest(const std::string &method_name, + const ::google::protobuf::MessageLite *req); + + Status HandleRpcResponse(std::shared_ptr<Response> response); + void HandleRpcTimeout(std::shared_ptr<Request> req, + const ::asio::error_code &ec); + void CommsError(const Status &status); + + void ClearAndDisconnect(const ::asio::error_code &ec); + std::shared_ptr<Request> RemoveFromRunningQueue(int call_id); + + LockFreeRpcEngine *const engine_; + std::shared_ptr<Response> current_response_state_; + AuthInfo auth_info_; + + // Connection can have deferred connection, especially when we're pausing + // during retry + enum ConnectedState { + kNotYetConnected, + kConnecting, + kHandshaking, + kAuthenticating, + kConnected, + kDisconnected + }; + static std::string ToString(ConnectedState connected); + ConnectedState connected_; + + // State machine for performing a SASL handshake + std::shared_ptr<SaslProtocol> sasl_protocol_; + // The request being sent over the wire; will also be in requests_on_fly_ + std::shared_ptr<Request> request_over_the_wire_; + // Requests to be sent over the wire + std::deque<std::shared_ptr<Request>> pending_requests_; + // Requests to be sent over the wire during authentication; not retried if + // there is a connection error + std::deque<std::shared_ptr<Request>> auth_requests_; + // Requests that are waiting for responses + typedef std::unordered_map<int, std::shared_ptr<Request>> RequestOnFlyMap; + RequestOnFlyMap requests_on_fly_; + std::shared_ptr<LibhdfsEvents> event_handlers_; + std::string cluster_name_; + + // Lock for mutable parts of this class that need to be thread safe + std::mutex connection_state_lock_; + + friend class SaslProtocol; }; -template <class Socket> -RpcConnectionImpl<Socket>::RpcConnectionImpl(RpcEngine *engine) - : RpcConnection(engine), - options_(engine->options()), - socket_(engine->io_service()), - connect_timer_(engine->io_service()) -{ - LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); -} - -template <class Socket> -RpcConnectionImpl<Socket>::~RpcConnectionImpl() { - LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this); - - if (pending_requests_.size() > 0) - LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue"); - if (requests_on_fly_.size() > 0) - LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); -} - -template <class Socket> -void RpcConnectionImpl<Socket>::Connect( - const std::vector<::asio::ip::tcp::endpoint> &server, - const AuthInfo & auth_info, - RpcCallback &handler) { - LOG_TRACE(kRPC, << "RpcConnectionImpl::Connect called"); - - this->auth_info_ = auth_info; - - auto connectionSuccessfulReq = std::make_shared<Request>( - engine_, [handler](::google::protobuf::io::CodedInputStream *is, - const Status &status) { - (void)is; - handler(status); - }); - pending_requests_.push_back(connectionSuccessfulReq); - this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF -} - -template <class Socket> -void RpcConnectionImpl<Socket>::ConnectAndFlush( - const std::vector<::asio::ip::tcp::endpoint> &server) { - - LOG_INFO(kRPC, << "ConnectAndFlush called"); - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - - if (server.empty()) { - Status s = Status::InvalidArgument("No endpoints provided"); - CommsError(s); - return; - } - - if (connected_ == kConnected) { - FlushPendingRequests(); - return; - } - if (connected_ != kNotYetConnected) { - LOG_WARN(kRPC, << "RpcConnectionImpl::ConnectAndFlush called while connected=" << ToString(connected_)); - return; - } - connected_ = kConnecting; - - // Take the first endpoint, but remember the alternatives for later - additional_endpoints_ = server; - ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front(); - additional_endpoints_.erase(additional_endpoints_.begin()); - current_endpoint_ = first_endpoint; - - auto shared_this = shared_from_this(); - socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) { - ConnectComplete(ec, first_endpoint); - }); - - // Prompt the timer to timeout - auto weak_this = std::weak_ptr<RpcConnection>(shared_this); - connect_timer_.expires_from_now( - std::chrono::milliseconds(options_.rpc_connect_timeout)); - connect_timer_.async_wait([shared_this, this, first_endpoint](const ::asio::error_code &ec) { - if (ec) - ConnectComplete(ec, first_endpoint); - else - ConnectComplete(make_error_code(asio::error::host_unreachable), first_endpoint); - }); -} - -template <class Socket> -void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) { - auto shared_this = RpcConnectionImpl<Socket>::shared_from_this(); - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - connect_timer_.cancel(); - - LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called"); - - // Could be an old async connect returning a result after we've moved on - if (remote != current_endpoint_) { - LOG_DEBUG(kRPC, << "Got ConnectComplete for " << remote << " but current_endpoint_ is " << current_endpoint_); - return; - } - if (connected_ != kConnecting) { - LOG_DEBUG(kRPC, << "Got ConnectComplete but current state is " << connected_);; - return; - } - - Status status = ToStatus(ec); - if(event_handlers_) { - event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0); -#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (event_resp.response() == event_response::kTest_Error) { - status = event_resp.status(); - } -#endif - } - - if (status.ok()) { - StartReading(); - SendHandshake([shared_this, this](const Status & s) { - HandshakeComplete(s); - }); - } else { - LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());; - std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_)); - if(!err.empty()) { - LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err); - } - - if (!additional_endpoints_.empty()) { - // If we have additional endpoints, keep trying until we either run out or - // hit one - ::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front(); - additional_endpoints_.erase(additional_endpoints_.begin()); - current_endpoint_ = next_endpoint; - - socket_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) { - ConnectComplete(ec, next_endpoint); - }); - connect_timer_.expires_from_now( - std::chrono::milliseconds(options_.rpc_connect_timeout)); - connect_timer_.async_wait([shared_this, this, next_endpoint](const ::asio::error_code &ec) { - if (ec) - ConnectComplete(ec, next_endpoint); - else - ConnectComplete(make_error_code(asio::error::host_unreachable), next_endpoint); - }); - } else { - CommsError(status); - } - } -} - -template <class Socket> -void RpcConnectionImpl<Socket>::SendHandshake(RpcCallback &handler) { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called"); - connected_ = kHandshaking; - - auto shared_this = shared_from_this(); - auto handshake_packet = PrepareHandshakePacket(); - ::asio::async_write(socket_, asio::buffer(*handshake_packet), - [handshake_packet, handler, shared_this, this]( - const ::asio::error_code &ec, size_t) { - Status status = ToStatus(ec); - handler(status); - }); -} - -template <class Socket> -void RpcConnectionImpl<Socket>::SendContext(RpcCallback &handler) { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called"); - - auto shared_this = shared_from_this(); - auto context_packet = PrepareContextPacket(); - ::asio::async_write(socket_, asio::buffer(*context_packet), - [context_packet, handler, shared_this, this]( - const ::asio::error_code &ec, size_t) { - Status status = ToStatus(ec); - handler(status); - }); -} - -template <class Socket> -void RpcConnectionImpl<Socket>::OnSendCompleted(const ::asio::error_code &ec, - size_t) { - using std::placeholders::_1; - using std::placeholders::_2; - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - - LOG_TRACE(kRPC, << "RpcConnectionImpl::OnSendCompleted called"); - - request_over_the_wire_.reset(); - if (ec) { - LOG_WARN(kRPC, << "Network error during RPC write: " << ec.message()); - CommsError(ToStatus(ec)); - return; - } - - FlushPendingRequests(); -} - -template <class Socket> -void RpcConnectionImpl<Socket>::FlushPendingRequests() { - using namespace ::std::placeholders; - - // Lock should be held - assert(lock_held(connection_state_lock_)); - - LOG_TRACE(kRPC, << "RpcConnectionImpl::FlushPendingRequests called"); - - // Don't send if we don't need to - if (request_over_the_wire_) { - return; - } - - std::shared_ptr<Request> req; - switch (connected_) { - case kNotYetConnected: - return; - case kConnecting: - return; - case kHandshaking: - return; - case kAuthenticating: - if (auth_requests_.empty()) { - return; - } - req = auth_requests_.front(); - auth_requests_.erase(auth_requests_.begin()); - break; - case kConnected: - if (pending_requests_.empty()) { - return; - } - req = pending_requests_.front(); - pending_requests_.erase(pending_requests_.begin()); - break; - case kDisconnected: - LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to flush a " << ToString(connected_) << " connection"); - return; - default: - LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests invalid state: " << ToString(connected_)); - return; - } - - std::shared_ptr<RpcConnection> shared_this = shared_from_this(); - auto weak_this = std::weak_ptr<RpcConnection>(shared_this); - auto weak_req = std::weak_ptr<Request>(req); - - std::shared_ptr<std::string> payload = std::make_shared<std::string>(); - req->GetPacket(payload.get()); - if (!payload->empty()) { - assert(requests_on_fly_.find(req->call_id()) == requests_on_fly_.end()); - requests_on_fly_[req->call_id()] = req; - request_over_the_wire_ = req; - - req->timer().expires_from_now( - std::chrono::milliseconds(options_.rpc_timeout)); - req->timer().async_wait([weak_this, weak_req, this](const ::asio::error_code &ec) { - auto timeout_this = weak_this.lock(); - auto timeout_req = weak_req.lock(); - if (timeout_this && timeout_req) - this->HandleRpcTimeout(timeout_req, ec); - }); - - asio::async_write(socket_, asio::buffer(*payload), - [shared_this, this, payload](const ::asio::error_code &ec, - size_t size) { - OnSendCompleted(ec, size); - }); - } else { // Nothing to send for this request, inform the handler immediately - io_service().post( - // Never hold locks when calling a callback - [req]() { req->OnResponseArrived(nullptr, Status::OK()); } - ); - - // Reschedule to flush the next one - AsyncFlushPendingRequests(); - } -} - - -template <class Socket> -void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code &original_ec, - size_t) { - using std::placeholders::_1; - using std::placeholders::_2; - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - - ::asio::error_code my_ec(original_ec); - - LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called"); - - std::shared_ptr<RpcConnection> shared_this = shared_from_this(); - - if(event_handlers_) { - event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); -#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (event_resp.response() == event_response::kTest_Error) { - my_ec = std::make_error_code(std::errc::network_down); - } -#endif - } - - switch (my_ec.value()) { - case 0: - // No errors - break; - case asio::error::operation_aborted: - // The event loop has been shut down. Ignore the error. - return; - default: - LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message()); - CommsError(ToStatus(my_ec)); - return; - } - - if (!current_response_state_) { /* start a new one */ - current_response_state_ = std::make_shared<Response>(); - } - - if (current_response_state_->state_ == Response::kReadLength) { - current_response_state_->state_ = Response::kReadContent; - auto buf = ::asio::buffer(reinterpret_cast<char *>(¤t_response_state_->length_), - sizeof(current_response_state_->length_)); - asio::async_read( - socket_, buf, - [shared_this, this](const ::asio::error_code &ec, size_t size) { - OnRecvCompleted(ec, size); - }); - } else if (current_response_state_->state_ == Response::kReadContent) { - current_response_state_->state_ = Response::kParseResponse; - current_response_state_->length_ = ntohl(current_response_state_->length_); - current_response_state_->data_.resize(current_response_state_->length_); - asio::async_read( - socket_, ::asio::buffer(current_response_state_->data_), - [shared_this, this](const ::asio::error_code &ec, size_t size) { - OnRecvCompleted(ec, size); - }); - } else if (current_response_state_->state_ == Response::kParseResponse) { - // Check return status from the RPC response. We may have received a msg - // indicating a server side error. - - Status stat = HandleRpcResponse(current_response_state_); - - if(stat.get_server_exception_type() == Status::kStandbyException) { - // May need to bail out, connect to new NN, and restart loop - LOG_INFO(kRPC, << "Communicating with standby NN, attempting to reconnect"); - } - - current_response_state_ = nullptr; - StartReading(); - } -} - -template <class Socket> -void RpcConnectionImpl<Socket>::Disconnect() { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called"); - - request_over_the_wire_.reset(); - if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating || connected_ == kConnected) { - // Don't print out errors, we were expecting a disconnect here - SafeDisconnect(get_asio_socket_ptr(&socket_)); - } - connected_ = kDisconnected; -} -} - -#endif +} // end namespace hdfs +#endif // end include Guard http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc new file mode 100644 index 0000000..7accaf8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc @@ -0,0 +1,446 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "rpc_engine.h" +#include "rpc_connection_impl.h" +#include "sasl_protocol.h" + +#include "RpcHeader.pb.h" +#include "ProtobufRpcEngine.pb.h" +#include "IpcConnectionContext.pb.h" + +namespace hdfs { + +namespace pb = ::google::protobuf; +namespace pbio = ::google::protobuf::io; + +using namespace ::hadoop::common; +using namespace ::std::placeholders; + +static const int kNoRetry = -1; + +static void AddHeadersToPacket( + std::string *res, std::initializer_list<const pb::MessageLite *> headers, + const std::string *payload) { + int len = 0; + std::for_each( + headers.begin(), headers.end(), + [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); }); + + if (payload) { + len += payload->size(); + } + + int net_len = htonl(len); + res->reserve(res->size() + sizeof(net_len) + len); + + pbio::StringOutputStream ss(res); + pbio::CodedOutputStream os(&ss); + os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len)); + + uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); + assert(buf); + + std::for_each( + headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) { + buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf); + buf = v->SerializeWithCachedSizesToArray(buf); + }); + + if (payload) { + buf = os.WriteStringToArray(*payload, buf); + } +} + +RpcConnection::~RpcConnection() {} + +RpcConnection::RpcConnection(LockFreeRpcEngine *engine) + : engine_(engine), + connected_(kNotYetConnected) {} + +::asio::io_service &RpcConnection::io_service() { + return engine_->io_service(); +} + +void RpcConnection::StartReading() { + auto shared_this = shared_from_this(); + io_service().post([shared_this, this] () { + OnRecvCompleted(::asio::error_code(), 0); + }); +} + +void RpcConnection::HandshakeComplete(const Status &s) { + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + + LOG_TRACE(kRPC, << "RpcConnectionImpl::HandshakeComplete called"); + + if (s.ok()) { + if (connected_ == kHandshaking) { + auto shared_this = shared_from_this(); + + connected_ = kAuthenticating; + if (auth_info_.useSASL()) { +#ifdef USE_SASL + sasl_protocol_ = std::make_shared<SaslProtocol>(cluster_name_, auth_info_, shared_from_this()); + sasl_protocol_->SetEventHandlers(event_handlers_); + sasl_protocol_->Authenticate([shared_this, this]( + const Status & status, const AuthInfo & new_auth_info) { + AuthComplete(status, new_auth_info); } ); +#else + AuthComplete_locked(Status::Error("SASL is required, but no SASL library was found"), auth_info_); +#endif + } else { + AuthComplete_locked(Status::OK(), auth_info_); + } + } + } else { + CommsError(s); + }; +} + +void RpcConnection::AuthComplete(const Status &s, const AuthInfo & new_auth_info) { + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + AuthComplete_locked(s, new_auth_info); +} + +void RpcConnection::AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info) { + assert(lock_held(connection_state_lock_)); // Must be holding lock before calling + LOG_TRACE(kRPC, << "RpcConnectionImpl::AuthComplete called"); + + // Free the sasl_protocol object + sasl_protocol_.reset(); + + if (s.ok()) { + auth_info_ = new_auth_info; + + auto shared_this = shared_from_this(); + SendContext([shared_this, this](const Status & s) { + ContextComplete(s); + }); + } else { + CommsError(s); + }; +} + +void RpcConnection::ContextComplete(const Status &s) { + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + + LOG_TRACE(kRPC, << "RpcConnectionImpl::ContextComplete called"); + + if (s.ok()) { + if (connected_ == kAuthenticating) { + connected_ = kConnected; + } + FlushPendingRequests(); + } else { + CommsError(s); + }; +} + +void RpcConnection::AsyncFlushPendingRequests() { + std::shared_ptr<RpcConnection> shared_this = shared_from_this(); + io_service().post([shared_this, this]() { + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + + LOG_TRACE(kRPC, << "RpcConnection::AsyncFlushPendingRequests called (connected=" << ToString(connected_) << ")"); + + if (!request_over_the_wire_) { + FlushPendingRequests(); + } + }); +} + +Status RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) { + assert(lock_held(connection_state_lock_)); // Must be holding lock before calling + + response->ar.reset(new pbio::ArrayInputStream(&response->data_[0], response->data_.size())); + response->in.reset(new pbio::CodedInputStream(response->ar.get())); + response->in->PushLimit(response->data_.size()); + RpcResponseHeaderProto h; + ReadDelimitedPBMessage(response->in.get(), &h); + + auto req = RemoveFromRunningQueue(h.callid()); + if (!req) { + LOG_WARN(kRPC, << "RPC response with Unknown call id " << h.callid()); + if((int32_t)h.callid() == RpcEngine::kCallIdSasl) { + return Status::AuthenticationFailed("You have an unsecured client connecting to a secured server"); + } else { + return Status::Error("Rpc response with unknown call id"); + } + } + + Status status; + if(event_handlers_) { + event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (event_resp.response() == event_response::kTest_Error) { + status = event_resp.status(); + } +#endif + } + + if (status.ok() && h.has_exceptionclassname()) { + status = + Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); + } + + if(status.get_server_exception_type() == Status::kStandbyException) { + LOG_WARN(kRPC, << "Tried to connect to standby. status = " << status.ToString()); + + // We got the request back, but it needs to be resent to the other NN + std::vector<std::shared_ptr<Request>> reqs_to_redirect = {req}; + PrependRequests_locked(reqs_to_redirect); + + CommsError(status); + return status; + } + + io_service().post([req, response, status]() { + req->OnResponseArrived(response->in.get(), status); // Never call back while holding a lock + }); + + return Status::OK(); +} + +void RpcConnection::HandleRpcTimeout(std::shared_ptr<Request> req, + const ::asio::error_code &ec) { + if (ec.value() == asio::error::operation_aborted) { + return; + } + + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + auto r = RemoveFromRunningQueue(req->call_id()); + if (!r) { + // The RPC might have been finished and removed from the queue + return; + } + + Status stat = ToStatus(ec ? ec : make_error_code(::asio::error::timed_out)); + + r->OnResponseArrived(nullptr, stat); +} + +std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() { + assert(lock_held(connection_state_lock_)); // Must be holding lock before calling + + /** From Client.java: + * + * Write the connection header - this is sent when connection is established + * +----------------------------------+ + * | "hrpc" 4 bytes | + * +----------------------------------+ + * | Version (1 byte) | + * +----------------------------------+ + * | Service Class (1 byte) | + * +----------------------------------+ + * | AuthProtocol (1 byte) | + * +----------------------------------+ + * + * AuthProtocol: 0->none, -33->SASL + */ + + char auth_protocol = auth_info_.useSASL() ? -33 : 0; + const char handshake_header[] = {'h', 'r', 'p', 'c', + RpcEngine::kRpcVersion, 0, auth_protocol}; + auto res = + std::make_shared<std::string>(handshake_header, sizeof(handshake_header)); + + return res; +} + +std::shared_ptr<std::string> RpcConnection::PrepareContextPacket() { + // This needs to be send after the SASL handshake, and + // after the SASL handshake (if any) + assert(lock_held(connection_state_lock_)); // Must be holding lock before calling + + auto res = std::make_shared<std::string>(); + + RpcRequestHeaderProto h; + h.set_rpckind(RPC_PROTOCOL_BUFFER); + h.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); + h.set_callid(RpcEngine::kCallIdConnectionContext); + h.set_clientid(engine_->client_name()); + + IpcConnectionContextProto handshake; + handshake.set_protocol(engine_->protocol_name()); + const std::string & user_name = auth_info_.getUser(); + if (!user_name.empty()) { + *handshake.mutable_userinfo()->mutable_effectiveuser() = user_name; + } + AddHeadersToPacket(res.get(), {&h, &handshake}, nullptr); + + return res; +} + +void RpcConnection::AsyncRpc( + const std::string &method_name, const ::google::protobuf::MessageLite *req, + std::shared_ptr<::google::protobuf::MessageLite> resp, + const RpcCallback &handler) { + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + AsyncRpc_locked(method_name, req, resp, handler); +} + +void RpcConnection::AsyncRpc_locked( + const std::string &method_name, const ::google::protobuf::MessageLite *req, + std::shared_ptr<::google::protobuf::MessageLite> resp, + const RpcCallback &handler) { + assert(lock_held(connection_state_lock_)); // Must be holding lock before calling + + auto wrapped_handler = + [resp, handler](pbio::CodedInputStream *is, const Status &status) { + if (status.ok()) { + if (is) { // Connect messages will not have an is + ReadDelimitedPBMessage(is, resp.get()); + } + } + handler(status); + }; + + int call_id = (method_name != SASL_METHOD_NAME ? engine_->NextCallId() : RpcEngine::kCallIdSasl); + auto r = std::make_shared<Request>(engine_, method_name, call_id, req, + std::move(wrapped_handler)); + auto r_vector = std::vector<std::shared_ptr<Request> > (1, r); + SendRpcRequests(r_vector); +} + +void RpcConnection::AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests) { + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + SendRpcRequests(requests); +} + +void RpcConnection::SendRpcRequests(const std::vector<std::shared_ptr<Request> > & requests) { + LOG_TRACE(kRPC, << "RpcConnection::SendRpcRequests[] called; connected=" << ToString(connected_)); + assert(lock_held(connection_state_lock_)); // Must be holding lock before calling + + if (connected_ == kDisconnected) { + // Oops. The connection failed _just_ before the engine got a chance + // to send it. Register it as a failure + Status status = Status::ResourceUnavailable("RpcConnection closed before send."); + engine_->AsyncRpcCommsError(status, shared_from_this(), requests); + } else { + for (auto r: requests) { + if (r->method_name() != SASL_METHOD_NAME) + pending_requests_.push_back(r); + else + auth_requests_.push_back(r); + } + if (connected_ == kConnected || connected_ == kHandshaking || connected_ == kAuthenticating) { // Dont flush if we're waiting or handshaking + FlushPendingRequests(); + } + } +} + + +void RpcConnection::PreEnqueueRequests( + std::vector<std::shared_ptr<Request>> requests) { + // Public method - acquire lock + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + + LOG_DEBUG(kRPC, << "RpcConnection::PreEnqueueRequests called"); + + assert(connected_ == kNotYetConnected); + + pending_requests_.insert(pending_requests_.end(), requests.begin(), + requests.end()); + // Don't start sending yet; will flush when connected +} + +// Only call when already holding conn state lock +void RpcConnection::PrependRequests_locked( std::vector<std::shared_ptr<Request>> requests) { + LOG_DEBUG(kRPC, << "RpcConnection::PrependRequests called"); + + pending_requests_.insert(pending_requests_.begin(), requests.begin(), + requests.end()); + // Don't start sending yet; will flush when connected +} + +void RpcConnection::SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers) { + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + event_handlers_ = event_handlers; + if (sasl_protocol_) { + sasl_protocol_->SetEventHandlers(event_handlers); + } +} + +void RpcConnection::SetClusterName(std::string cluster_name) { + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + cluster_name_ = cluster_name; +} + +void RpcConnection::CommsError(const Status &status) { + assert(lock_held(connection_state_lock_)); // Must be holding lock before calling + LOG_DEBUG(kRPC, << "RpcConnection::CommsError called"); + + Disconnect(); + + // Anything that has been queued to the connection (on the fly or pending) + // will get dinged for a retry + std::vector<std::shared_ptr<Request>> requestsToReturn; + std::transform(requests_on_fly_.begin(), requests_on_fly_.end(), + std::back_inserter(requestsToReturn), + std::bind(&RequestOnFlyMap::value_type::second, _1)); + requests_on_fly_.clear(); + + requestsToReturn.insert(requestsToReturn.end(), + std::make_move_iterator(pending_requests_.begin()), + std::make_move_iterator(pending_requests_.end())); + pending_requests_.clear(); + + engine_->AsyncRpcCommsError(status, shared_from_this(), requestsToReturn); +} + +void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) { + Disconnect(); + std::vector<std::shared_ptr<Request>> requests; + std::transform(requests_on_fly_.begin(), requests_on_fly_.end(), + std::back_inserter(requests), + std::bind(&RequestOnFlyMap::value_type::second, _1)); + requests_on_fly_.clear(); + requests.insert(requests.end(), + std::make_move_iterator(pending_requests_.begin()), + std::make_move_iterator(pending_requests_.end())); + pending_requests_.clear(); + for (const auto &req : requests) { + req->OnResponseArrived(nullptr, ToStatus(ec)); + } +} + +std::shared_ptr<Request> RpcConnection::RemoveFromRunningQueue(int call_id) { + assert(lock_held(connection_state_lock_)); // Must be holding lock before calling + auto it = requests_on_fly_.find(call_id); + if (it == requests_on_fly_.end()) { + return std::shared_ptr<Request>(); + } + + auto req = it->second; + requests_on_fly_.erase(it); + return req; +} + +std::string RpcConnection::ToString(ConnectedState connected) { + switch(connected) { + case kNotYetConnected: return "NotYetConnected"; + case kConnecting: return "Connecting"; + case kHandshaking: return "Handshaking"; + case kAuthenticating: return "Authenticating"; + case kConnected: return "Connected"; + case kDisconnected: return "Disconnected"; + default: return "Invalid ConnectedState"; + } +} + +}// end namespace hdfs --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org