http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc deleted file mode 100644 index ee9b704..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc +++ /dev/null @@ -1,506 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "rpc_engine.h" -#include "rpc_connection_impl.h" -#include "sasl_protocol.h" - -#include "RpcHeader.pb.h" -#include "ProtobufRpcEngine.pb.h" -#include "IpcConnectionContext.pb.h" - -namespace hdfs { - -namespace pb = ::google::protobuf; -namespace pbio = ::google::protobuf::io; - -using namespace ::hadoop::common; -using namespace ::std::placeholders; - -static void AddHeadersToPacket( - std::string *res, std::initializer_list<const pb::MessageLite *> headers, - const std::string *payload) { - int len = 0; - std::for_each( - headers.begin(), headers.end(), - [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); }); - - if (payload) { - len += payload->size(); - } - - int net_len = htonl(len); - res->reserve(res->size() + sizeof(net_len) + len); - - pbio::StringOutputStream ss(res); - pbio::CodedOutputStream os(&ss); - os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len)); - - uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); - assert(buf); - - std::for_each( - headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) { - buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf); - buf = v->SerializeWithCachedSizesToArray(buf); - }); - - if (payload) { - buf = os.WriteStringToArray(*payload, buf); - } -} - -RpcConnection::~RpcConnection() {} - -RpcConnection::RpcConnection(std::shared_ptr<LockFreeRpcEngine> engine) - : engine_(engine), - connected_(kNotYetConnected) {} - -::asio::io_service *RpcConnection::GetIoService() { - std::shared_ptr<LockFreeRpcEngine> pinnedEngine = engine_.lock(); - if(!pinnedEngine) { - LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid RpcEngine"); - return nullptr; - } - - return &pinnedEngine->io_service(); -} - -void RpcConnection::StartReading() { - auto shared_this = shared_from_this(); - ::asio::io_service *service = GetIoService(); - if(!service) { - LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid IoService"); - return; - } - - service->post([shared_this, this] () { - OnRecvCompleted(::asio::error_code(), 0); - }); -} - -void RpcConnection::HandshakeComplete(const Status &s) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - - LOG_TRACE(kRPC, << "RpcConnectionImpl::HandshakeComplete called"); - - if (s.ok()) { - if (connected_ == kHandshaking) { - auto shared_this = shared_from_this(); - - connected_ = kAuthenticating; - if (auth_info_.useSASL()) { -#ifdef USE_SASL - sasl_protocol_ = std::make_shared<SaslProtocol>(cluster_name_, auth_info_, shared_from_this()); - sasl_protocol_->SetEventHandlers(event_handlers_); - sasl_protocol_->Authenticate([shared_this, this]( - const Status & status, const AuthInfo & new_auth_info) { - AuthComplete(status, new_auth_info); } ); -#else - AuthComplete_locked(Status::Error("SASL is required, but no SASL library was found"), auth_info_); -#endif - } else { - AuthComplete_locked(Status::OK(), auth_info_); - } - } - } else { - CommsError(s); - }; -} - -void RpcConnection::AuthComplete(const Status &s, const AuthInfo & new_auth_info) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - AuthComplete_locked(s, new_auth_info); -} - -void RpcConnection::AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info) { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - LOG_TRACE(kRPC, << "RpcConnectionImpl::AuthComplete called"); - - // Free the sasl_protocol object - sasl_protocol_.reset(); - - if (s.ok()) { - auth_info_ = new_auth_info; - - auto shared_this = shared_from_this(); - SendContext([shared_this, this](const Status & s) { - ContextComplete(s); - }); - } else { - CommsError(s); - }; -} - -void RpcConnection::ContextComplete(const Status &s) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - - LOG_TRACE(kRPC, << "RpcConnectionImpl::ContextComplete called"); - - if (s.ok()) { - if (connected_ == kAuthenticating) { - connected_ = kConnected; - } - FlushPendingRequests(); - } else { - CommsError(s); - }; -} - -void RpcConnection::AsyncFlushPendingRequests() { - std::shared_ptr<RpcConnection> shared_this = shared_from_this(); - - ::asio::io_service *service = GetIoService(); - if(!service) { - LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid IoService"); - return; - } - - service->post([shared_this, this]() { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - - LOG_TRACE(kRPC, << "RpcConnection::AsyncFlushPendingRequests called (connected=" << ToString(connected_) << ")"); - - if (!outgoing_request_) { - FlushPendingRequests(); - } - }); -} - -Status RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - response->ar.reset(new pbio::ArrayInputStream(&response->data_[0], response->data_.size())); - response->in.reset(new pbio::CodedInputStream(response->ar.get())); - response->in->PushLimit(response->data_.size()); - RpcResponseHeaderProto h; - ReadDelimitedPBMessage(response->in.get(), &h); - - auto req = RemoveFromRunningQueue(h.callid()); - if (!req) { - LOG_WARN(kRPC, << "RPC response with Unknown call id " << (int32_t)h.callid()); - if((int32_t)h.callid() == RpcEngine::kCallIdSasl) { - return Status::AuthenticationFailed("You have an unsecured client connecting to a secured server"); - } else if((int32_t)h.callid() == RpcEngine::kCallIdAuthorizationFailed) { - return Status::AuthorizationFailed("RPC call id indicates an authorization failure"); - } else { - return Status::Error("Rpc response with unknown call id"); - } - } - - Status status; - if(event_handlers_) { - event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); -#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (event_resp.response_type() == event_response::kTest_Error) { - status = event_resp.status(); - } -#endif - } - - if (status.ok() && h.has_exceptionclassname()) { - status = - Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); - } - - if(status.get_server_exception_type() == Status::kStandbyException) { - LOG_WARN(kRPC, << "Tried to connect to standby. status = " << status.ToString()); - - // We got the request back, but it needs to be resent to the other NN - std::vector<std::shared_ptr<Request>> reqs_to_redirect = {req}; - PrependRequests_locked(reqs_to_redirect); - - CommsError(status); - return status; - } - - ::asio::io_service *service = GetIoService(); - if(!service) { - LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid IoService"); - return Status::Error("RpcConnection attempted to access invalid IoService"); - } - - service->post([req, response, status]() { - req->OnResponseArrived(response->in.get(), status); // Never call back while holding a lock - }); - - return Status::OK(); -} - -void RpcConnection::HandleRpcTimeout(std::shared_ptr<Request> req, - const ::asio::error_code &ec) { - if (ec.value() == asio::error::operation_aborted) { - return; - } - - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - auto r = RemoveFromRunningQueue(req->call_id()); - if (!r) { - // The RPC might have been finished and removed from the queue - return; - } - - Status stat = ToStatus(ec ? ec : make_error_code(::asio::error::timed_out)); - - r->OnResponseArrived(nullptr, stat); -} - -std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - /** From Client.java: - * - * Write the connection header - this is sent when connection is established - * +----------------------------------+ - * | "hrpc" 4 bytes | - * +----------------------------------+ - * | Version (1 byte) | - * +----------------------------------+ - * | Service Class (1 byte) | - * +----------------------------------+ - * | AuthProtocol (1 byte) | - * +----------------------------------+ - * - * AuthProtocol: 0->none, -33->SASL - */ - - char auth_protocol = auth_info_.useSASL() ? -33 : 0; - const char handshake_header[] = {'h', 'r', 'p', 'c', - RpcEngine::kRpcVersion, 0, auth_protocol}; - auto res = - std::make_shared<std::string>(handshake_header, sizeof(handshake_header)); - - return res; -} - -std::shared_ptr<std::string> RpcConnection::PrepareContextPacket() { - // This needs to be send after the SASL handshake, and - // after the SASL handshake (if any) - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - std::shared_ptr<LockFreeRpcEngine> pinnedEngine = engine_.lock(); - if(!pinnedEngine) { - LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid RpcEngine"); - return std::make_shared<std::string>(); - } - - std::shared_ptr<std::string> serializedPacketBuffer = std::make_shared<std::string>(); - - RpcRequestHeaderProto headerProto; - headerProto.set_rpckind(RPC_PROTOCOL_BUFFER); - headerProto.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); - headerProto.set_callid(RpcEngine::kCallIdConnectionContext); - headerProto.set_clientid(pinnedEngine->client_name()); - - IpcConnectionContextProto handshakeContextProto; - handshakeContextProto.set_protocol(pinnedEngine->protocol_name()); - const std::string & user_name = auth_info_.getUser(); - if (!user_name.empty()) { - *handshakeContextProto.mutable_userinfo()->mutable_effectiveuser() = user_name; - } - AddHeadersToPacket(serializedPacketBuffer.get(), {&headerProto, &handshakeContextProto}, nullptr); - - return serializedPacketBuffer; -} - -void RpcConnection::AsyncRpc( - const std::string &method_name, const ::google::protobuf::MessageLite *req, - std::shared_ptr<::google::protobuf::MessageLite> resp, - const RpcCallback &handler) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - AsyncRpc_locked(method_name, req, resp, handler); -} - -void RpcConnection::AsyncRpc_locked( - const std::string &method_name, const ::google::protobuf::MessageLite *req, - std::shared_ptr<::google::protobuf::MessageLite> resp, - const RpcCallback &handler) { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - auto wrapped_handler = - [resp, handler](pbio::CodedInputStream *is, const Status &status) { - if (status.ok()) { - if (is) { // Connect messages will not have an is - ReadDelimitedPBMessage(is, resp.get()); - } - } - handler(status); - }; - - - std::shared_ptr<Request> rpcRequest; - { // Scope to minimize how long RpcEngine's lifetime may be extended - std::shared_ptr<LockFreeRpcEngine> pinnedEngine = engine_.lock(); - if(!pinnedEngine) { - LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid RpcEngine"); - handler(Status::Error("Invalid RpcEngine access.")); - return; - } - - int call_id = (method_name != SASL_METHOD_NAME ? pinnedEngine->NextCallId() : RpcEngine::kCallIdSasl); - rpcRequest = std::make_shared<Request>(pinnedEngine, method_name, call_id, - req, std::move(wrapped_handler)); - } - - SendRpcRequests({rpcRequest}); -} - -void RpcConnection::AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - SendRpcRequests(requests); -} - -void RpcConnection::SendRpcRequests(const std::vector<std::shared_ptr<Request> > & requests) { - LOG_TRACE(kRPC, << "RpcConnection::SendRpcRequests[] called; connected=" << ToString(connected_)); - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - if (connected_ == kDisconnected) { - // Oops. The connection failed _just_ before the engine got a chance - // to send it. Register it as a failure - Status status = Status::ResourceUnavailable("RpcConnection closed before send."); - - std::shared_ptr<LockFreeRpcEngine> pinnedEngine = engine_.lock(); - if(!pinnedEngine) { - LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid RpcEngine"); - return; - } - - pinnedEngine->AsyncRpcCommsError(status, shared_from_this(), requests); - } else { - for (auto request : requests) { - if (request->method_name() != SASL_METHOD_NAME) - pending_requests_.push_back(request); - else - auth_requests_.push_back(request); - } - if (connected_ == kConnected || connected_ == kHandshaking || connected_ == kAuthenticating) { // Dont flush if we're waiting or handshaking - FlushPendingRequests(); - } - } -} - - -void RpcConnection::PreEnqueueRequests( - std::vector<std::shared_ptr<Request>> requests) { - // Public method - acquire lock - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - - LOG_DEBUG(kRPC, << "RpcConnection::PreEnqueueRequests called"); - - assert(connected_ == kNotYetConnected); - - pending_requests_.insert(pending_requests_.end(), requests.begin(), - requests.end()); - // Don't start sending yet; will flush when connected -} - -// Only call when already holding conn state lock -void RpcConnection::PrependRequests_locked( std::vector<std::shared_ptr<Request>> requests) { - LOG_DEBUG(kRPC, << "RpcConnection::PrependRequests called"); - - pending_requests_.insert(pending_requests_.begin(), requests.begin(), - requests.end()); - // Don't start sending yet; will flush when connected -} - -void RpcConnection::SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - event_handlers_ = event_handlers; - if (sasl_protocol_) { - sasl_protocol_->SetEventHandlers(event_handlers); - } -} - -void RpcConnection::SetClusterName(std::string cluster_name) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - cluster_name_ = cluster_name; -} - -void RpcConnection::SetAuthInfo(const AuthInfo& auth_info) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - auth_info_ = auth_info; -} - -void RpcConnection::CommsError(const Status &status) { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - LOG_DEBUG(kRPC, << "RpcConnection::CommsError called"); - - Disconnect(); - - // Anything that has been queued to the connection (on the fly or pending) - // will get dinged for a retry - std::vector<std::shared_ptr<Request>> requestsToReturn; - std::transform(sent_requests_.begin(), sent_requests_.end(), - std::back_inserter(requestsToReturn), - std::bind(&SentRequestMap::value_type::second, _1)); - sent_requests_.clear(); - - requestsToReturn.insert(requestsToReturn.end(), - std::make_move_iterator(pending_requests_.begin()), - std::make_move_iterator(pending_requests_.end())); - pending_requests_.clear(); - - std::shared_ptr<LockFreeRpcEngine> pinnedEngine = engine_.lock(); - if(!pinnedEngine) { - LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access an invalid RpcEngine"); - return; - } - - pinnedEngine->AsyncRpcCommsError(status, shared_from_this(), requestsToReturn); -} - -void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) { - Disconnect(); - std::vector<std::shared_ptr<Request>> requests; - std::transform(sent_requests_.begin(), sent_requests_.end(), - std::back_inserter(requests), - std::bind(&SentRequestMap::value_type::second, _1)); - sent_requests_.clear(); - requests.insert(requests.end(), - std::make_move_iterator(pending_requests_.begin()), - std::make_move_iterator(pending_requests_.end())); - pending_requests_.clear(); - for (const auto &req : requests) { - req->OnResponseArrived(nullptr, ToStatus(ec)); - } -} - -std::shared_ptr<Request> RpcConnection::RemoveFromRunningQueue(int call_id) { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - auto it = sent_requests_.find(call_id); - if (it == sent_requests_.end()) { - return std::shared_ptr<Request>(); - } - - auto req = it->second; - sent_requests_.erase(it); - return req; -} - -std::string RpcConnection::ToString(ConnectedState connected) { - switch(connected) { - case kNotYetConnected: return "NotYetConnected"; - case kConnecting: return "Connecting"; - case kHandshaking: return "Handshaking"; - case kAuthenticating: return "Authenticating"; - case kConnected: return "Connected"; - case kDisconnected: return "Disconnected"; - default: return "Invalid ConnectedState"; - } -} - -}// end namespace hdfs
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h deleted file mode 100644 index 8e579a2..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h +++ /dev/null @@ -1,463 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIB_RPC_RPC_CONNECTION_IMPL_H_ -#define LIB_RPC_RPC_CONNECTION_IMPL_H_ - -#include "rpc_connection.h" -#include "rpc_engine.h" -#include "request.h" - -#include "common/auth_info.h" -#include "common/logging.h" -#include "common/util.h" -#include "common/libhdfs_events_impl.h" - -#include <asio/connect.hpp> -#include <asio/read.hpp> -#include <asio/write.hpp> - -#include <system_error> - -namespace hdfs { - -template <class Socket> -class RpcConnectionImpl : public RpcConnection { -public: - MEMCHECKED_CLASS(RpcConnectionImpl) - - RpcConnectionImpl(std::shared_ptr<RpcEngine> engine); - virtual ~RpcConnectionImpl() override; - - virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, - const AuthInfo & auth_info, - RpcCallback &handler) override; - virtual void ConnectAndFlush( - const std::vector<::asio::ip::tcp::endpoint> &server) override; - virtual void SendHandshake(RpcCallback &handler) override; - virtual void SendContext(RpcCallback &handler) override; - virtual void Disconnect() override; - virtual void OnSendCompleted(const ::asio::error_code &ec, - size_t transferred) override; - virtual void OnRecvCompleted(const ::asio::error_code &ec, - size_t transferred) override; - virtual void FlushPendingRequests() override; - - - Socket &TEST_get_mutable_socket() { return socket_; } - - void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; } - - private: - const Options options_; - ::asio::ip::tcp::endpoint current_endpoint_; - std::vector<::asio::ip::tcp::endpoint> additional_endpoints_; - Socket socket_; - ::asio::deadline_timer connect_timer_; - - void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint &remote); -}; - -template <class Socket> -RpcConnectionImpl<Socket>::RpcConnectionImpl(std::shared_ptr<RpcEngine> engine) - : RpcConnection(engine), - options_(engine->options()), - socket_(engine->io_service()), - connect_timer_(engine->io_service()) -{ - LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); -} - -template <class Socket> -RpcConnectionImpl<Socket>::~RpcConnectionImpl() { - LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this); - - if (pending_requests_.size() > 0) - LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue"); - if (sent_requests_.size() > 0) - LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the sent_requests queue"); -} - -template <class Socket> -void RpcConnectionImpl<Socket>::Connect( - const std::vector<::asio::ip::tcp::endpoint> &server, - const AuthInfo & auth_info, - RpcCallback &handler) { - LOG_TRACE(kRPC, << "RpcConnectionImpl::Connect called"); - - this->auth_info_ = auth_info; - - std::shared_ptr<Request> connectionRequest; - { // Scope to minimize how long RpcEngine's lifetime may be extended - std::shared_ptr<LockFreeRpcEngine> pinned_engine = engine_.lock(); - if(!pinned_engine) { - LOG_ERROR(kRPC, << "RpcConnectionImpl@" << this << " attempted to access invalid RpcEngine"); - handler(Status::Error("Invalid RpcEngine access.")); - return; - } - - connectionRequest = std::make_shared<Request>(pinned_engine, - [handler](::google::protobuf::io::CodedInputStream *is,const Status &status) { - (void)is; - handler(status); - }); - } - - pending_requests_.push_back(connectionRequest); - this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF -} - -template <class Socket> -void RpcConnectionImpl<Socket>::ConnectAndFlush( - const std::vector<::asio::ip::tcp::endpoint> &server) { - - LOG_INFO(kRPC, << "ConnectAndFlush called"); - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - - if (server.empty()) { - Status s = Status::InvalidArgument("No endpoints provided"); - CommsError(s); - return; - } - - if (connected_ == kConnected) { - FlushPendingRequests(); - return; - } - if (connected_ != kNotYetConnected) { - LOG_WARN(kRPC, << "RpcConnectionImpl::ConnectAndFlush called while connected=" << ToString(connected_)); - return; - } - connected_ = kConnecting; - - // Take the first endpoint, but remember the alternatives for later - additional_endpoints_ = server; - ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front(); - additional_endpoints_.erase(additional_endpoints_.begin()); - current_endpoint_ = first_endpoint; - - auto shared_this = shared_from_this(); - socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) { - ConnectComplete(ec, first_endpoint); - }); - - // Prompt the timer to timeout - auto weak_this = std::weak_ptr<RpcConnection>(shared_this); - connect_timer_.expires_from_now( - std::chrono::milliseconds(options_.rpc_connect_timeout)); - connect_timer_.async_wait([shared_this, this, first_endpoint](const ::asio::error_code &ec) { - if (ec) - ConnectComplete(ec, first_endpoint); - else - ConnectComplete(make_error_code(asio::error::host_unreachable), first_endpoint); - }); -} - -template <class Socket> -void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) { - auto shared_this = RpcConnectionImpl<Socket>::shared_from_this(); - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - connect_timer_.cancel(); - - LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called"); - - // Could be an old async connect returning a result after we've moved on - if (remote != current_endpoint_) { - LOG_DEBUG(kRPC, << "Got ConnectComplete for " << remote << " but current_endpoint_ is " << current_endpoint_); - return; - } - if (connected_ != kConnecting) { - LOG_DEBUG(kRPC, << "Got ConnectComplete but current state is " << connected_);; - return; - } - - Status status = ToStatus(ec); - if(event_handlers_) { - event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0); -#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (event_resp.response_type() == event_response::kTest_Error) { - status = event_resp.status(); - } -#endif - } - - if (status.ok()) { - StartReading(); - SendHandshake([shared_this, this](const Status & s) { - HandshakeComplete(s); - }); - } else { - LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());; - std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_)); - if(!err.empty()) { - LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err); - } - - if (!additional_endpoints_.empty()) { - // If we have additional endpoints, keep trying until we either run out or - // hit one - ::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front(); - additional_endpoints_.erase(additional_endpoints_.begin()); - current_endpoint_ = next_endpoint; - - socket_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) { - ConnectComplete(ec, next_endpoint); - }); - connect_timer_.expires_from_now( - std::chrono::milliseconds(options_.rpc_connect_timeout)); - connect_timer_.async_wait([shared_this, this, next_endpoint](const ::asio::error_code &ec) { - if (ec) - ConnectComplete(ec, next_endpoint); - else - ConnectComplete(make_error_code(asio::error::host_unreachable), next_endpoint); - }); - } else { - CommsError(status); - } - } -} - -template <class Socket> -void RpcConnectionImpl<Socket>::SendHandshake(RpcCallback &handler) { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called"); - connected_ = kHandshaking; - - auto shared_this = shared_from_this(); - auto handshake_packet = PrepareHandshakePacket(); - ::asio::async_write(socket_, asio::buffer(*handshake_packet), - [handshake_packet, handler, shared_this, this]( - const ::asio::error_code &ec, size_t) { - Status status = ToStatus(ec); - handler(status); - }); -} - -template <class Socket> -void RpcConnectionImpl<Socket>::SendContext(RpcCallback &handler) { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called"); - - auto shared_this = shared_from_this(); - auto context_packet = PrepareContextPacket(); - ::asio::async_write(socket_, asio::buffer(*context_packet), - [context_packet, handler, shared_this, this]( - const ::asio::error_code &ec, size_t) { - Status status = ToStatus(ec); - handler(status); - }); -} - -template <class Socket> -void RpcConnectionImpl<Socket>::OnSendCompleted(const ::asio::error_code &ec, - size_t) { - using std::placeholders::_1; - using std::placeholders::_2; - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - - LOG_TRACE(kRPC, << "RpcConnectionImpl::OnSendCompleted called"); - - outgoing_request_.reset(); - if (ec) { - LOG_WARN(kRPC, << "Network error during RPC write: " << ec.message()); - CommsError(ToStatus(ec)); - return; - } - - FlushPendingRequests(); -} - -template <class Socket> -void RpcConnectionImpl<Socket>::FlushPendingRequests() { - using namespace ::std::placeholders; - - // Lock should be held - assert(lock_held(connection_state_lock_)); - - LOG_TRACE(kRPC, << "RpcConnectionImpl::FlushPendingRequests called"); - - // Don't send if we don't need to - if (outgoing_request_) { - return; - } - - std::shared_ptr<Request> req; - switch (connected_) { - case kNotYetConnected: - return; - case kConnecting: - return; - case kHandshaking: - return; - case kAuthenticating: - if (auth_requests_.empty()) { - return; - } - req = auth_requests_.front(); - auth_requests_.erase(auth_requests_.begin()); - break; - case kConnected: - if (pending_requests_.empty()) { - return; - } - req = pending_requests_.front(); - pending_requests_.erase(pending_requests_.begin()); - break; - case kDisconnected: - LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to flush a " << ToString(connected_) << " connection"); - return; - default: - LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests invalid state: " << ToString(connected_)); - return; - } - - std::shared_ptr<RpcConnection> shared_this = shared_from_this(); - auto weak_this = std::weak_ptr<RpcConnection>(shared_this); - auto weak_req = std::weak_ptr<Request>(req); - - std::shared_ptr<std::string> payload = std::make_shared<std::string>(); - req->GetPacket(payload.get()); - if (!payload->empty()) { - assert(sent_requests_.find(req->call_id()) == sent_requests_.end()); - sent_requests_[req->call_id()] = req; - outgoing_request_ = req; - - req->timer().expires_from_now( - std::chrono::milliseconds(options_.rpc_timeout)); - req->timer().async_wait([weak_this, weak_req, this](const ::asio::error_code &ec) { - auto timeout_this = weak_this.lock(); - auto timeout_req = weak_req.lock(); - if (timeout_this && timeout_req) - this->HandleRpcTimeout(timeout_req, ec); - }); - - asio::async_write(socket_, asio::buffer(*payload), - [shared_this, this, payload](const ::asio::error_code &ec, - size_t size) { - OnSendCompleted(ec, size); - }); - } else { // Nothing to send for this request, inform the handler immediately - ::asio::io_service *service = GetIoService(); - if(!service) { - LOG_ERROR(kRPC, << "RpcConnectionImpl@" << this << " attempted to access null IoService"); - // No easy way to bail out of this context, but the only way to get here is when - // the FileSystem is being destroyed. - return; - } - - service->post( - // Never hold locks when calling a callback - [req]() { req->OnResponseArrived(nullptr, Status::OK()); } - ); - - // Reschedule to flush the next one - AsyncFlushPendingRequests(); - } -} - - -template <class Socket> -void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code &original_ec, - size_t) { - using std::placeholders::_1; - using std::placeholders::_2; - std::lock_guard<std::mutex> state_lock(connection_state_lock_); - - ::asio::error_code my_ec(original_ec); - - LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called"); - - std::shared_ptr<RpcConnection> shared_this = shared_from_this(); - - if(event_handlers_) { - event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); -#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (event_resp.response_type() == event_response::kTest_Error) { - my_ec = std::make_error_code(std::errc::network_down); - } -#endif - } - - switch (my_ec.value()) { - case 0: - // No errors - break; - case asio::error::operation_aborted: - // The event loop has been shut down. Ignore the error. - return; - default: - LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message()); - CommsError(ToStatus(my_ec)); - return; - } - - if (!current_response_state_) { /* start a new one */ - current_response_state_ = std::make_shared<Response>(); - } - - if (current_response_state_->state_ == Response::kReadLength) { - current_response_state_->state_ = Response::kReadContent; - auto buf = ::asio::buffer(reinterpret_cast<char *>(¤t_response_state_->length_), - sizeof(current_response_state_->length_)); - asio::async_read( - socket_, buf, - [shared_this, this](const ::asio::error_code &ec, size_t size) { - OnRecvCompleted(ec, size); - }); - } else if (current_response_state_->state_ == Response::kReadContent) { - current_response_state_->state_ = Response::kParseResponse; - current_response_state_->length_ = ntohl(current_response_state_->length_); - current_response_state_->data_.resize(current_response_state_->length_); - asio::async_read( - socket_, ::asio::buffer(current_response_state_->data_), - [shared_this, this](const ::asio::error_code &ec, size_t size) { - OnRecvCompleted(ec, size); - }); - } else if (current_response_state_->state_ == Response::kParseResponse) { - // Check return status from the RPC response. We may have received a msg - // indicating a server side error. - - Status stat = HandleRpcResponse(current_response_state_); - - if(stat.get_server_exception_type() == Status::kStandbyException) { - // May need to bail out, connect to new NN, and restart loop - LOG_INFO(kRPC, << "Communicating with standby NN, attempting to reconnect"); - } - - current_response_state_ = nullptr; - StartReading(); - } -} - -template <class Socket> -void RpcConnectionImpl<Socket>::Disconnect() { - assert(lock_held(connection_state_lock_)); // Must be holding lock before calling - - LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called"); - - outgoing_request_.reset(); - if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating || connected_ == kConnected) { - // Don't print out errors, we were expecting a disconnect here - SafeDisconnect(get_asio_socket_ptr(&socket_)); - } - connected_ = kDisconnected; -} -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc deleted file mode 100644 index 0ca7c6a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ /dev/null @@ -1,334 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "rpc_engine.h" -#include "rpc_connection_impl.h" -#include "common/util.h" -#include "common/logging.h" -#include "common/namenode_info.h" -#include "common/optional_wrapper.h" - -#include <algorithm> - -namespace hdfs { - -template <class T> -using optional = std::experimental::optional<T>; - - -RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options, - const std::string &client_name, const std::string &user_name, - const char *protocol_name, int protocol_version) - : io_service_(io_service), - options_(options), - client_name_(client_name), - client_id_(getRandomClientId()), - protocol_name_(protocol_name), - protocol_version_(protocol_version), - call_id_(0), - retry_timer(*io_service), - event_handlers_(std::make_shared<LibhdfsEvents>()), - connect_canceled_(false) -{ - LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called"); - - auth_info_.setUser(user_name); - if (options.authentication == Options::kKerberos) { - auth_info_.setMethod(AuthInfo::kKerberos); - } -} - -void RpcEngine::Connect(const std::string &cluster_name, - const std::vector<ResolvedNamenodeInfo> servers, - RpcCallback &handler) { - std::lock_guard<std::mutex> state_lock(engine_state_lock_); - LOG_DEBUG(kRPC, << "RpcEngine::Connect called"); - - last_endpoints_ = servers[0].endpoints; - cluster_name_ = cluster_name; - LOG_TRACE(kRPC, << "Got cluster name \"" << cluster_name << "\" in RpcEngine::Connect") - - ha_persisted_info_.reset(new HANamenodeTracker(servers, io_service_, event_handlers_)); - if(!ha_persisted_info_->is_enabled()) { - ha_persisted_info_.reset(); - } - - // Construct retry policy after we determine if config is HA - retry_policy_ = MakeRetryPolicy(options_); - - conn_ = InitializeConnection(); - conn_->Connect(last_endpoints_, auth_info_, handler); -} - -bool RpcEngine::CancelPendingConnect() { - if(connect_canceled_) { - LOG_DEBUG(kRPC, << "RpcEngine@" << this << "::CancelPendingConnect called more than once"); - return false; - } - - connect_canceled_ = true; - return true; -} - -void RpcEngine::Shutdown() { - LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called"); - io_service_->post([this]() { - std::lock_guard<std::mutex> state_lock(engine_state_lock_); - conn_.reset(); - }); -} - -std::unique_ptr<const RetryPolicy> RpcEngine::MakeRetryPolicy(const Options &options) { - LOG_DEBUG(kRPC, << "RpcEngine::MakeRetryPolicy called"); - - if(ha_persisted_info_) { - LOG_INFO(kRPC, << "Cluster is HA configued so policy will default to HA until a knob is implemented"); - return std::unique_ptr<RetryPolicy>(new FixedDelayWithFailover(options.rpc_retry_delay_ms, - options.max_rpc_retries, - options.failover_max_retries, - options.failover_connection_max_retries)); - } else if (options.max_rpc_retries > 0) { - return std::unique_ptr<RetryPolicy>(new FixedDelayRetryPolicy(options.rpc_retry_delay_ms, - options.max_rpc_retries)); - } else { - return nullptr; - } -} - -std::string RpcEngine::getRandomClientId() -{ - /** - * The server is requesting a 16-byte UUID: - * https://github.com/c9n/hadoop/blob/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java - * - * This function generates a 16-byte UUID (version 4): - * https://en.wikipedia.org/wiki/Universally_unique_identifier#Version_4_.28random.29 - **/ - std::vector<unsigned char>buf(16); - RAND_pseudo_bytes(&buf[0], buf.size()); - - //clear the first four bits of byte 6 then set the second bit - buf[6] = (buf[6] & 0x0f) | 0x40; - - //clear the second bit of byte 8 and set the first bit - buf[8] = (buf[8] & 0xbf) | 0x80; - return std::string(reinterpret_cast<const char*>(&buf[0]), buf.size()); -} - - - -void RpcEngine::TEST_SetRpcConnection(std::shared_ptr<RpcConnection> conn) { - conn_ = conn; - retry_policy_ = MakeRetryPolicy(options_); -} - -void RpcEngine::TEST_SetRetryPolicy(std::unique_ptr<const RetryPolicy> policy) { - retry_policy_ = std::move(policy); -} - -std::unique_ptr<const RetryPolicy> RpcEngine::TEST_GenerateRetryPolicyUsingOptions() { - return MakeRetryPolicy(options_); -} - -void RpcEngine::AsyncRpc( - const std::string &method_name, const ::google::protobuf::MessageLite *req, - const std::shared_ptr<::google::protobuf::MessageLite> &resp, - const std::function<void(const Status &)> &handler) { - std::lock_guard<std::mutex> state_lock(engine_state_lock_); - - LOG_TRACE(kRPC, << "RpcEngine::AsyncRpc called"); - - // In case user-side code isn't checking the status of Connect before doing RPC - if(connect_canceled_) { - io_service_->post( - [handler](){ handler(Status::Canceled()); } - ); - return; - } - - if (!conn_) { - conn_ = InitializeConnection(); - conn_->ConnectAndFlush(last_endpoints_); - } - conn_->AsyncRpc(method_name, req, resp, handler); -} - -std::shared_ptr<RpcConnection> RpcEngine::NewConnection() -{ - LOG_DEBUG(kRPC, << "RpcEngine::NewConnection called"); - - return std::make_shared<RpcConnectionImpl<::asio::ip::tcp::socket>>(shared_from_this()); -} - -std::shared_ptr<RpcConnection> RpcEngine::InitializeConnection() -{ - std::shared_ptr<RpcConnection> newConn = NewConnection(); - newConn->SetEventHandlers(event_handlers_); - newConn->SetClusterName(cluster_name_); - newConn->SetAuthInfo(auth_info_); - - return newConn; -} - -void RpcEngine::AsyncRpcCommsError( - const Status &status, - std::shared_ptr<RpcConnection> failedConnection, - std::vector<std::shared_ptr<Request>> pendingRequests) { - LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; status=\"" << status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << std::to_string(pendingRequests.size())); - - io_service().post([this, status, failedConnection, pendingRequests]() { - RpcCommsError(status, failedConnection, pendingRequests); - }); -} - -void RpcEngine::RpcCommsError( - const Status &status, - std::shared_ptr<RpcConnection> failedConnection, - std::vector<std::shared_ptr<Request>> pendingRequests) { - LOG_WARN(kRPC, << "RpcEngine::RpcCommsError called; status=\"" << status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << std::to_string(pendingRequests.size())); - - std::lock_guard<std::mutex> state_lock(engine_state_lock_); - - // If the failed connection is the current one, shut it down - // It will be reconnected when there is work to do - if (failedConnection == conn_) { - LOG_INFO(kRPC, << "Disconnecting from failed RpcConnection"); - conn_.reset(); - } - - optional<RetryAction> head_action = optional<RetryAction>(); - - // Filter out anything with too many retries already - if(event_handlers_) { - event_handlers_->call(FS_NN_PRE_RPC_RETRY_EVENT, "RpcCommsError", - reinterpret_cast<int64_t>(this)); - } - - for (auto it = pendingRequests.begin(); it < pendingRequests.end();) { - auto req = *it; - - LOG_DEBUG(kRPC, << req->GetDebugString()); - - RetryAction retry = RetryAction::fail(""); // Default to fail - - if(connect_canceled_) { - retry = RetryAction::fail("Operation canceled"); - } else if (status.notWorthRetry()) { - retry = RetryAction::fail(status.ToString().c_str()); - } else if (retry_policy()) { - retry = retry_policy()->ShouldRetry(status, req->IncrementRetryCount(), req->get_failover_count(), true); - } - - if (retry.action == RetryAction::FAIL) { - // If we've exceeded the maximum retry, take the latest error and pass it - // on. There might be a good argument for caching the first error - // rather than the last one, that gets messy - - io_service().post([req, status]() { - req->OnResponseArrived(nullptr, status); // Never call back while holding a lock - }); - it = pendingRequests.erase(it); - } else { - if (!head_action) { - head_action = retry; - } - - ++it; - } - } - - // If we have reqests that need to be re-sent, ensure that we have a connection - // and send the requests to it - bool haveRequests = !pendingRequests.empty() && - head_action && head_action->action != RetryAction::FAIL; - - if (haveRequests) { - LOG_TRACE(kRPC, << "Have " << std::to_string(pendingRequests.size()) << " requests to resend"); - bool needNewConnection = !conn_; - if (needNewConnection) { - LOG_DEBUG(kRPC, << "Creating a new NN conection"); - - - // If HA is enabled and we have valid HA info then fail over to the standby (hopefully now active) - if(head_action->action == RetryAction::FAILOVER_AND_RETRY && ha_persisted_info_) { - - for(unsigned int i=0; i<pendingRequests.size();i++) { - pendingRequests[i]->IncrementFailoverCount(); - } - - ResolvedNamenodeInfo new_active_nn_info; - bool failoverInfoFound = ha_persisted_info_->GetFailoverAndUpdate(last_endpoints_, new_active_nn_info); - if(!failoverInfoFound) { - // This shouldn't be a common case, the set of endpoints was empty, likely due to DNS issues. - // Another possibility is a network device has been added or removed due to a VM starting or stopping. - - LOG_ERROR(kRPC, << "Failed to find endpoints for the alternate namenode." - << "Make sure Namenode hostnames can be found with a DNS lookup."); - // Kill all pending RPC requests since there's nowhere for this to go - Status badEndpointStatus = Status::Error("No endpoints found for namenode"); - - for(unsigned int i=0; i<pendingRequests.size(); i++) { - std::shared_ptr<Request> sharedCurrentRequest = pendingRequests[i]; - io_service().post([sharedCurrentRequest, badEndpointStatus]() { - sharedCurrentRequest->OnResponseArrived(nullptr, badEndpointStatus); // Never call back while holding a lock - }); - } - - // Clear request vector. This isn't a recoverable error. - pendingRequests.clear(); - } - - if(ha_persisted_info_->is_resolved()) { - LOG_INFO(kRPC, << "Going to try connecting to alternate Namenode: " << new_active_nn_info.uri.str()); - last_endpoints_ = new_active_nn_info.endpoints; - } else { - LOG_WARN(kRPC, << "It looks HA is turned on, but unable to fail over. has info=" - << ha_persisted_info_->is_enabled() << " resolved=" << ha_persisted_info_->is_resolved()); - } - } - - conn_ = InitializeConnection(); - conn_->PreEnqueueRequests(pendingRequests); - - if (head_action->delayMillis > 0) { - auto weak_conn = std::weak_ptr<RpcConnection>(conn_); - retry_timer.expires_from_now( - std::chrono::milliseconds(head_action->delayMillis)); - retry_timer.async_wait([this, weak_conn](asio::error_code ec) { - auto strong_conn = weak_conn.lock(); - if ( (!ec) && (strong_conn) ) { - strong_conn->ConnectAndFlush(last_endpoints_); - } - }); - } else { - conn_->ConnectAndFlush(last_endpoints_); - } - } else { - // We have an existing connection (which might be closed; we don't know - // until we hold the connection local) and should just add the new requests - conn_->AsyncRpc(pendingRequests); - } - } -} - - -void RpcEngine::SetFsEventCallback(fs_event_callback callback) { - event_handlers_->set_fs_callback(callback); -} - - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h deleted file mode 100644 index 9f45fcf..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIB_RPC_RPC_ENGINE_H_ -#define LIB_RPC_RPC_ENGINE_H_ - -#include "hdfspp/options.h" -#include "hdfspp/status.h" - -#include "common/auth_info.h" -#include "common/retry_policy.h" -#include "common/libhdfs_events_impl.h" -#include "common/util.h" -#include "common/new_delete.h" -#include "common/namenode_info.h" -#include "namenode_tracker.h" - -#include <google/protobuf/message_lite.h> - -#include <asio/ip/tcp.hpp> -#include <asio/deadline_timer.hpp> - -#include <atomic> -#include <memory> -#include <vector> -#include <mutex> - -namespace hdfs { - - /* - * NOTE ABOUT LOCKING MODELS - * - * To prevent deadlocks, anything that might acquire multiple locks must - * acquire the lock on the RpcEngine first, then the RpcConnection. Callbacks - * will never be called while holding any locks, so the components are free - * to take locks when servicing a callback. - * - * An RpcRequest or RpcConnection should never call any methods on the RpcEngine - * except for those that are exposed through the LockFreeRpcEngine interface. - */ - -typedef const std::function<void(const Status &)> RpcCallback; - -class LockFreeRpcEngine; -class RpcConnection; -class SaslProtocol; -class RpcConnection; -class Request; - -/* - * These methods of the RpcEngine will never acquire locks, and are safe for - * RpcConnections to call while holding a ConnectionLock. - */ -class LockFreeRpcEngine { -public: - MEMCHECKED_CLASS(LockFreeRpcEngine) - - /* Enqueues a CommsError without acquiring a lock*/ - virtual void AsyncRpcCommsError(const Status &status, - std::shared_ptr<RpcConnection> failedConnection, - std::vector<std::shared_ptr<Request>> pendingRequests) = 0; - - - virtual const RetryPolicy *retry_policy() = 0; - virtual int NextCallId() = 0; - - virtual const std::string &client_name() = 0; - virtual const std::string &client_id() = 0; - virtual const std::string &user_name() = 0; - virtual const std::string &protocol_name() = 0; - virtual int protocol_version() = 0; - virtual ::asio::io_service &io_service() = 0; - virtual const Options &options() = 0; -}; - - -/* - * An engine for reliable communication with a NameNode. Handles connection, - * retry, and (someday) failover of the requested messages. - * - * Threading model: thread-safe. All callbacks will be called back from - * an asio pool and will not hold any internal locks - */ -class RpcEngine : public LockFreeRpcEngine, public std::enable_shared_from_this<RpcEngine> { - public: - MEMCHECKED_CLASS(RpcEngine) - enum { kRpcVersion = 9 }; - enum { - kCallIdAuthorizationFailed = -1, - kCallIdInvalid = -2, - kCallIdConnectionContext = -3, - kCallIdPing = -4, - kCallIdSasl = -33 - }; - - RpcEngine(::asio::io_service *io_service, const Options &options, - const std::string &client_name, const std::string &user_name, - const char *protocol_name, int protocol_version); - - void Connect(const std::string & cluster_name, - const std::vector<ResolvedNamenodeInfo> servers, - RpcCallback &handler); - - bool CancelPendingConnect(); - - void AsyncRpc(const std::string &method_name, - const ::google::protobuf::MessageLite *req, - const std::shared_ptr<::google::protobuf::MessageLite> &resp, - const std::function<void(const Status &)> &handler); - - void Shutdown(); - - /* Enqueues a CommsError without acquiring a lock*/ - void AsyncRpcCommsError(const Status &status, - std::shared_ptr<RpcConnection> failedConnection, - std::vector<std::shared_ptr<Request>> pendingRequests) override; - void RpcCommsError(const Status &status, - std::shared_ptr<RpcConnection> failedConnection, - std::vector<std::shared_ptr<Request>> pendingRequests); - - - const RetryPolicy * retry_policy() override { return retry_policy_.get(); } - int NextCallId() override { return ++call_id_; } - - void TEST_SetRpcConnection(std::shared_ptr<RpcConnection> conn); - void TEST_SetRetryPolicy(std::unique_ptr<const RetryPolicy> policy); - std::unique_ptr<const RetryPolicy> TEST_GenerateRetryPolicyUsingOptions(); - - const std::string &client_name() override { return client_name_; } - const std::string &client_id() override { return client_id_; } - const std::string &user_name() override { return auth_info_.getUser(); } - const std::string &protocol_name() override { return protocol_name_; } - int protocol_version() override { return protocol_version_; } - ::asio::io_service &io_service() override { return *io_service_; } - const Options &options() override { return options_; } - static std::string GetRandomClientName(); - - void SetFsEventCallback(fs_event_callback callback); -protected: - std::shared_ptr<RpcConnection> conn_; - std::shared_ptr<RpcConnection> InitializeConnection(); - virtual std::shared_ptr<RpcConnection> NewConnection(); - virtual std::unique_ptr<const RetryPolicy> MakeRetryPolicy(const Options &options); - - static std::string getRandomClientId(); - - // Remember all of the last endpoints in case we need to reconnect and retry - std::vector<::asio::ip::tcp::endpoint> last_endpoints_; - -private: - ::asio::io_service * const io_service_; - const Options options_; - const std::string client_name_; - const std::string client_id_; - const std::string protocol_name_; - const int protocol_version_; - std::unique_ptr<const RetryPolicy> retry_policy_; //null --> no retry - AuthInfo auth_info_; - std::string cluster_name_; - std::atomic_int call_id_; - ::asio::deadline_timer retry_timer; - - std::shared_ptr<LibhdfsEvents> event_handlers_; - - std::mutex engine_state_lock_; - - // Once Connect has been canceled there is no going back - bool connect_canceled_; - - // Keep endpoint info for all HA connections, a non-null ptr indicates - // that HA info was found in the configuation. - std::unique_ptr<HANamenodeTracker> ha_persisted_info_; -}; -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.cc deleted file mode 100644 index c5b90f0..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.cc +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <sstream> -#include <string.h> // memcpy() - -#include "sasl_engine.h" -#include "common/logging.h" - -namespace hdfs { - -/***************************************************************************** - * SASL ENGINE BASE CLASS - */ - -SaslEngine::State SaslEngine::GetState() -{ - return state_; -} - -SaslEngine::~SaslEngine() { -} - -Status SaslEngine::SetKerberosInfo(const std::string &principal) -{ - principal_ = principal; - return Status::OK(); -} - -Status SaslEngine::SetPasswordInfo(const std::string &id, - const std::string &password) -{ - id_ = id; - password_ = password; - return Status::OK(); -} - -bool SaslEngine::ChooseMech(const std::vector<SaslMethod> &resp_auths) { - Status status = Status::OK(); - - if (resp_auths.empty()) return false; - - for (SaslMethod auth: resp_auths) { - if ( auth.mechanism != "GSSAPI") continue; // Hack: only GSSAPI for now - - // do a proper deep copy of the vector element - // that we like, because the original v ector will go away: - chosen_mech_.mechanism = auth.mechanism; - chosen_mech_.protocol = auth.protocol; - chosen_mech_.serverid = auth.serverid; - chosen_mech_.challenge = auth.challenge; - - return auth.mechanism.c_str(); - } - - state_ = kErrorState; - status = Status::Error("SaslEngine::chooseMech(): No good protocol."); - - // Clear out the chosen mech - chosen_mech_ = SaslMethod(); - - return false; -} // choose_mech() - -} // namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.h deleted file mode 100644 index 6c82ccd..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.h +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef LIB_RPC_SASLENGINE_H -#define LIB_RPC_SASLENGINE_H - -#include "hdfspp/status.h" -#include "common/optional_wrapper.h" - -#include <vector> - -namespace hdfs { - -class SaslProtocol; - -template <class T> -using optional = std::experimental::optional<T>; - -class SaslMethod { -public: - std::string protocol; - std::string mechanism; - std::string serverid; - std::string challenge; -}; - -class SaslEngine { -public: - enum State { - kUnstarted, - kWaitingForData, - kSuccess, - kFailure, - kErrorState, - }; - - // State transitions: - // \--------------------------/ - // kUnstarted --start--> kWaitingForData --step-+--> kSuccess --finish--v - // \-> kFailure -/ - - // State transitions: - // \--------------------------/ - // kUnstarted --start--> kWaitingForData --step-+--> kSuccess --finish--v - // \-> kFailure -/ - - SaslEngine(): state_ (kUnstarted) {} - virtual ~SaslEngine(); - - // Must be called when state is kUnstarted - Status SetKerberosInfo(const std::string &principal); - // Must be called when state is kUnstarted - Status SetPasswordInfo(const std::string &id, - const std::string &password); - - // Choose a mechanism from the available ones. Will set the - // chosen_mech_ member and return true if we found one we - // can process - bool ChooseMech(const std::vector<SaslMethod> &avail_auths); - - // Returns the current state - State GetState(); - - // Must be called when state is kUnstarted - virtual std::pair<Status,std::string> Start() = 0; - - // Must be called when state is kWaitingForData - // Returns kOK and any data that should be sent to the server - virtual std::pair<Status,std::string> Step(const std::string data) = 0; - - // Must only be called when state is kSuccess, kFailure, or kErrorState - virtual Status Finish() = 0; - - // main repository of generic Sasl config data: - SaslMethod chosen_mech_; -protected: - State state_; - SaslProtocol * sasl_protocol_; - - optional<std::string> principal_; - optional<std::string> realm_; - optional<std::string> id_; - optional<std::string> password_; - -}; // class SaslEngine - -} // namespace hdfs - -#endif /* LIB_RPC_SASLENGINE_H */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc deleted file mode 100644 index 0957ea3..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc +++ /dev/null @@ -1,407 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "rpc_engine.h" -#include "rpc_connection.h" -#include "common/logging.h" -#include "common/optional_wrapper.h" - -#include "sasl_engine.h" -#include "sasl_protocol.h" - -#if defined USE_SASL - #if defined USE_CYRUS_SASL - #include "cyrus_sasl_engine.h" // CySaslEngine() - #elif defined USE_GSASL - #include "gsasl_engine.h" // GSaslEngine() - #else - #error USE_SASL defined but no engine (USE_GSASL) defined - #endif -#endif - -namespace hdfs { - -using namespace hadoop::common; -using namespace google::protobuf; - -/***** - * Threading model: all entry points need to acquire the sasl_lock before accessing - * members of the class - * - * Lifecycle model: asio may have outstanding callbacks into this class for arbitrary - * amounts of time, so any references to the class must be shared_ptr's. The - * SASLProtocol keeps a weak_ptr to the owning RpcConnection, which might go away, - * so the weak_ptr should be locked only long enough to make callbacks into the - * RpcConnection. - */ - -SaslProtocol::SaslProtocol(const std::string & cluster_name, - const AuthInfo & auth_info, - std::shared_ptr<RpcConnection> connection) : - state_(kUnstarted), - cluster_name_(cluster_name), - auth_info_(auth_info), - connection_(connection) -{ -} - -SaslProtocol::~SaslProtocol() -{ - assert(state_ != kNegotiate); -} - -void SaslProtocol::SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers) { - std::lock_guard<std::mutex> state_lock(sasl_state_lock_); - event_handlers_ = event_handlers; -} // SetEventHandlers() method - -void SaslProtocol::Authenticate(std::function<void(const Status & status, const AuthInfo new_auth_info)> callback) -{ - std::lock_guard<std::mutex> state_lock(sasl_state_lock_); - - callback_ = callback; - state_ = kNegotiate; - event_handlers_->call("SASL Start", cluster_name_.c_str(), 0); - - std::shared_ptr<RpcSaslProto> req_msg = std::make_shared<RpcSaslProto>(); - req_msg->set_state(RpcSaslProto_SaslState_NEGOTIATE); - - // We cheat here since this is always called while holding the RpcConnection's lock - std::shared_ptr<RpcConnection> connection = connection_.lock(); - if (!connection) { - AuthComplete(Status::AuthenticationFailed("Lost RPC Connection"), AuthInfo()); - return; - } - - std::shared_ptr<RpcSaslProto> resp_msg = std::make_shared<RpcSaslProto>(); - auto self(shared_from_this()); - connection->AsyncRpc_locked(SASL_METHOD_NAME, req_msg.get(), resp_msg, - [self, req_msg, resp_msg] (const Status & status) { - self->OnServerResponse(status, resp_msg.get()); } ); -} // authenticate() method - -AuthInfo::AuthMethod ParseMethod(const std::string & method) - { - if (0 == strcasecmp(method.c_str(), "SIMPLE")) { - return AuthInfo::kSimple; - } - else if (0 == strcasecmp(method.c_str(), "KERBEROS")) { - return AuthInfo::kKerberos; - } - else if (0 == strcasecmp(method.c_str(), "TOKEN")) { - return AuthInfo::kToken; - } - else { - return AuthInfo::kUnknownAuth; - } -} // ParseMethod() - -// build_init_msg(): -// Helper function for Start(), to keep -// these ProtoBuf-RPC calls out of the sasl_engine code. - -std::pair<Status, RpcSaslProto> -SaslProtocol::BuildInitMessage(std::string & token, const hadoop::common::RpcSaslProto * negotiate_msg) -{ - // The init message needs one of the RpcSaslProto_SaslAuth structures that - // was sent in the negotiate message as our chosen mechanism - // Map the chosen_mech name back to the RpcSaslProto_SaslAuth from the negotiate - // message that it corresponds to - SaslMethod & chosenMech = sasl_engine_->chosen_mech_; - auto auths = negotiate_msg->auths(); - auto pb_auth_it = std::find_if(auths.begin(), auths.end(), - [chosenMech](const RpcSaslProto_SaslAuth & data) - { - return data.mechanism() == chosenMech.mechanism; - }); - if (pb_auth_it == auths.end()) - return std::make_pair(Status::Error("Couldn't find mechanism in negotiate msg"), RpcSaslProto()); - auto & pb_auth = *pb_auth_it; - - // Prepare INITIATE message - RpcSaslProto initiate = RpcSaslProto(); - - initiate.set_state(RpcSaslProto_SaslState_INITIATE); - // initiate message will contain: - // token_ (binary data), and - // auths_[ ], an array of objects just like pb_auth. - // In our case, we want the auths array - // to hold just the single element from pb_auth: - - RpcSaslProto_SaslAuth * respAuth = initiate.add_auths(); - respAuth->CopyFrom(pb_auth); - - // Mostly, an INITIATE message contains a "Token". - // For GSSAPI, the token is a Kerberos AP_REQ, aka - // "Authenticated application request," comprising - // the client's application ticket & and an encrypted - // message that Kerberos calls an "authenticator". - - if (token.empty()) { - const char * errmsg = "SaslProtocol::build_init_msg(): No token available."; - LOG_ERROR(kRPC, << errmsg); - return std::make_pair(Status::Error(errmsg), RpcSaslProto()); - } - - // add challenge token to the INITIATE message: - initiate.set_token(token); - - // the initiate message is ready to send: - return std::make_pair(Status::OK(), initiate); -} // build_init_msg() - -// Converts the RpcSaslProto.auths ararray from RpcSaslProto_SaslAuth PB -// structures to SaslMethod structures -static bool -extract_auths(std::vector<SaslMethod> & resp_auths, - const hadoop::common::RpcSaslProto * response) { - - bool simple_avail = false; - auto pb_auths = response->auths(); - - // For our GSSAPI case, an element of pb_auths contains: - // method_ = "KERBEROS" - // mechanism_ = "GSSAPI" - // protocol_ = "nn" /* "name node", AKA "hdfs" - // serverid_ = "foobar1.acmecorp.com" - // challenge_ = "" - // _cached_size_ = 0 - // _has_bits_ = 15 - - for (int i = 0; i < pb_auths.size(); ++i) { - auto pb_auth = pb_auths.Get(i); - AuthInfo::AuthMethod method = ParseMethod(pb_auth.method()); - - switch(method) { - case AuthInfo::kToken: - case AuthInfo::kKerberos: { - SaslMethod new_method; - new_method.mechanism = pb_auth.mechanism(); - new_method.protocol = pb_auth.protocol(); - new_method.serverid = pb_auth.serverid(); - new_method.challenge = pb_auth.has_challenge() ? - pb_auth.challenge() : ""; - resp_auths.push_back(new_method); - } - break; - case AuthInfo::kSimple: - simple_avail = true; - break; - case AuthInfo::kUnknownAuth: - LOG_WARN(kRPC, << "Unknown auth method " << pb_auth.method() << "; ignoring"); - break; - default: - LOG_WARN(kRPC, << "Invalid auth type: " << method << "; ignoring"); - break; - } - } // for - return simple_avail; -} // extract_auths() - -void SaslProtocol::ResetEngine() { -#if defined USE_SASL - #if defined USE_CYRUS_SASL - sasl_engine_.reset(new CySaslEngine()); - #elif defined USE_GSASL - sasl_engine_.reset(new GSaslEngine()); - #else - #error USE_SASL defined but no engine (USE_GSASL) defined - #endif -#endif - return; -} // Reset_Engine() method - -void SaslProtocol::Negotiate(const hadoop::common::RpcSaslProto * response) -{ - this->ResetEngine(); // get a new SaslEngine - - if (auth_info_.getToken()) { - sasl_engine_->SetPasswordInfo(auth_info_.getToken().value().identifier, - auth_info_.getToken().value().password); - } - sasl_engine_->SetKerberosInfo(auth_info_.getUser()); // TODO: map to principal? - - // Copy the response's auths list to an array of SaslMethod objects. - // SaslEngine shouldn't need to know about the protobuf classes. - std::vector<SaslMethod> resp_auths; - bool simple_available = extract_auths(resp_auths, response); - bool mech_chosen = sasl_engine_->ChooseMech(resp_auths); - - if (mech_chosen) { - - // Prepare an INITIATE message, - // later on we'll send it to the hdfs server: - auto start_result = sasl_engine_->Start(); - Status status = start_result.first; - if (! status.ok()) { - // start() failed, simple isn't avail, - // so give up & stop authentication: - AuthComplete(status, auth_info_); - return; - } - // token.second is a binary buffer, containing - // client credentials that will prove the - // client's identity to the application server. - // Put the token into an INITIATE message: - auto init = BuildInitMessage(start_result.second, response); - - // If all is OK, send the INITIATE msg to the hdfs server; - // Otherwise, if possible, fail over to simple authentication: - status = init.first; - if (status.ok()) { - SendSaslMessage(init.second); - return; - } - if (!simple_available) { - // build_init_msg() failed, simple isn't avail, - // so give up & stop authentication: - AuthComplete(status, auth_info_); - return; - } - // If simple IS available, fall through to below, - // but without build_init_msg()'s failure-status. - } - - // There were no resp_auths, or the SaslEngine couldn't make one work - if (simple_available) { - // Simple was the only one we could use. That's OK. - AuthComplete(Status::OK(), auth_info_); - return; - } else { - // We didn't understand any of the resp_auths; - // Give back some information - std::stringstream ss; - ss << "Client cannot authenticate via: "; - - auto pb_auths = response->auths(); - for (int i = 0; i < pb_auths.size(); ++i) { - const RpcSaslProto_SaslAuth & pb_auth = pb_auths.Get(i); - ss << pb_auth.mechanism() << ", "; - } - - AuthComplete(Status::Error(ss.str().c_str()), auth_info_); - return; - } -} // Negotiate() method - -void SaslProtocol::Challenge(const hadoop::common::RpcSaslProto * challenge) -{ - if (!sasl_engine_) { - AuthComplete(Status::Error("Received challenge before negotiate"), auth_info_); - return; - } - - RpcSaslProto response; - response.CopyFrom(*challenge); - response.set_state(RpcSaslProto_SaslState_RESPONSE); - - std::string challenge_token = challenge->has_token() ? challenge->token() : ""; - auto sasl_response = sasl_engine_->Step(challenge_token); - - if (sasl_response.first.ok()) { - response.set_token(sasl_response.second); - - std::shared_ptr<RpcSaslProto> return_msg = std::make_shared<RpcSaslProto>(); - SendSaslMessage(response); - } else { - AuthComplete(sasl_response.first, auth_info_); - return; - } -} // Challenge() method - -bool SaslProtocol::SendSaslMessage(RpcSaslProto & message) -{ - assert(lock_held(sasl_state_lock_)); // Must be holding lock before calling - - // RpcConnection might have been freed when we weren't looking. Lock it - // to make sure it's there long enough for us - std::shared_ptr<RpcConnection> connection = connection_.lock(); - if (!connection) { - LOG_DEBUG(kRPC, << "Tried sending a SASL Message but the RPC connection was gone"); - AuthComplete(Status::AuthenticationFailed("Lost RPC Connection"), AuthInfo()); - return false; - } - - std::shared_ptr<RpcSaslProto> resp_msg = std::make_shared<RpcSaslProto>(); - auto self(shared_from_this()); - connection->AsyncRpc(SASL_METHOD_NAME, &message, resp_msg, - [self, resp_msg] (const Status & status) { - self->OnServerResponse(status, resp_msg.get()); - } ); - - return true; -} // SendSaslMessage() method - -// AuthComplete(): stop the auth effort, successful ot not: -bool SaslProtocol::AuthComplete(const Status & status, const AuthInfo & auth_info) -{ - assert(lock_held(sasl_state_lock_)); // Must be holding lock before calling - state_ = kComplete; - event_handlers_->call("SASL End", cluster_name_.c_str(), 0); - - // RpcConnection might have been freed when we weren't looking. Lock it - // to make sure it's there long enough for us - std::shared_ptr<RpcConnection> connection = connection_.lock(); - if (!connection) { - LOG_DEBUG(kRPC, << "Tried sending an AuthComplete but the RPC connection was gone: " << status.ToString()); - return false; - } - - LOG_TRACE(kRPC, << "Received SASL response" << status.ToString()); - connection->AuthComplete(status, auth_info); - - return true; -} // AuthComplete() method - -void SaslProtocol::OnServerResponse(const Status & status, const hadoop::common::RpcSaslProto * response) -{ - std::lock_guard<std::mutex> state_lock(sasl_state_lock_); - LOG_TRACE(kRPC, << "Received SASL response: " << status.ToString()); - - if (status.ok()) { - switch(response->state()) { - case RpcSaslProto_SaslState_NEGOTIATE: - Negotiate(response); - break; - case RpcSaslProto_SaslState_CHALLENGE: - Challenge(response); - break; - case RpcSaslProto_SaslState_SUCCESS: - if (sasl_engine_) { - sasl_engine_->Finish(); - } - AuthComplete(Status::OK(), auth_info_); - break; - - case RpcSaslProto_SaslState_INITIATE: // Server side only - case RpcSaslProto_SaslState_RESPONSE: // Server side only - case RpcSaslProto_SaslState_WRAP: - LOG_ERROR(kRPC, << "Invalid client-side SASL state: " << response->state()); - AuthComplete(Status::Error("Invalid client-side state"), auth_info_); - break; - default: - LOG_ERROR(kRPC, << "Unknown client-side SASL state: " << response->state()); - AuthComplete(Status::Error("Unknown client-side state"), auth_info_); - break; - } - } else { - AuthComplete(status, auth_info_); - } -} // OnServerResponse() method - -} // namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.h deleted file mode 100644 index a46ae08..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.h +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef LIB_RPC_SASLPROTOCOL_H -#define LIB_RPC_SASLPROTOCOL_H - -#include <memory> -#include <mutex> -#include <functional> - -#include <RpcHeader.pb.h> - -#include "hdfspp/status.h" -#include "common/auth_info.h" -#include "common/libhdfs_events_impl.h" - -namespace hdfs { - -static constexpr const char * SASL_METHOD_NAME = "sasl message"; - -class RpcConnection; -class SaslEngine; -class SaslMethod; - -class SaslProtocol : public std::enable_shared_from_this<SaslProtocol> -{ -public: - SaslProtocol(const std::string &cluster_name, - const AuthInfo & auth_info, - std::shared_ptr<RpcConnection> connection); - virtual ~SaslProtocol(); - - void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers); - - // Start the async authentication process. Must be called while holding the - // connection lock, but all callbacks will occur outside of the connection lock - void Authenticate(std::function<void(const Status & status, const AuthInfo new_auth_info)> callback); - void OnServerResponse(const Status & status, const hadoop::common::RpcSaslProto * response); - std::pair<Status, hadoop::common::RpcSaslProto> BuildInitMessage( std::string & token, const hadoop::common::RpcSaslProto * negotiate_msg); -private: - enum State { - kUnstarted, - kNegotiate, - kComplete - }; - - // Lock for access to members of the class - std::mutex sasl_state_lock_; - - State state_; - const std::string cluster_name_; - AuthInfo auth_info_; - std::weak_ptr<RpcConnection> connection_; - std::function<void(const Status & status, const AuthInfo new_auth_info)> callback_; - std::unique_ptr<SaslEngine> sasl_engine_; - std::shared_ptr<LibhdfsEvents> event_handlers_; - - bool SendSaslMessage(hadoop::common::RpcSaslProto & message); - bool AuthComplete(const Status & status, const AuthInfo & auth_info); - - void ResetEngine(); - void Negotiate(const hadoop::common::RpcSaslProto * response); - void Challenge(const hadoop::common::RpcSaslProto * response); - -}; // class SaslProtocol - -} // namespace hdfs - -#endif /* LIB_RPC_SASLPROTOCOL_H */ --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org