Repository: hadoop Updated Branches: refs/heads/HDFS-8707 c2e1e23d8 -> e1f73feef
HDFS-10231: libhdfs++: Fix race conditions in RPC layer. Contributed by Bob Hansen. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e1f73fee Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e1f73fee Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e1f73fee Branch: refs/heads/HDFS-8707 Commit: e1f73feefb02544e4104e21b5390b965140775a8 Parents: c2e1e23 Author: Bob Hansen <b...@hp.com> Authored: Mon Apr 4 08:53:52 2016 -0700 Committer: Bob Hansen <b...@hp.com> Committed: Mon Apr 4 08:53:52 2016 -0700 ---------------------------------------------------------------------- .../native/libhdfspp/lib/rpc/rpc_connection.cc | 84 +++++++++++++------- .../native/libhdfspp/lib/rpc/rpc_connection.h | 52 +++++++++--- .../main/native/libhdfspp/lib/rpc/rpc_engine.cc | 83 +++++++++---------- .../main/native/libhdfspp/lib/rpc/rpc_engine.h | 23 +++--- .../native/libhdfspp/tests/rpc_engine_test.cc | 12 +-- 5 files changed, 158 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f73fee/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc index bed3347..6c3b82e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc @@ -158,15 +158,17 @@ void Request::OnResponseArrived(pbio::CodedInputStream *is, RpcConnection::RpcConnection(LockFreeRpcEngine *engine) : engine_(engine), - connected_(false) {} + connected_(kNotYetConnected) {} ::asio::io_service &RpcConnection::io_service() { return engine_->io_service(); } void RpcConnection::StartReading() { - io_service().post(std::bind(&RpcConnection::OnRecvCompleted, this, - ::asio::error_code(), 0)); + auto shared_this = shared_from_this(); + io_service().post([shared_this, this] () { + OnRecvCompleted(::asio::error_code(), 0); + }); } void RpcConnection::AsyncFlushPendingRequests() { @@ -174,6 +176,8 @@ void RpcConnection::AsyncFlushPendingRequests() { io_service().post([shared_this, this]() { std::lock_guard<std::mutex> state_lock(connection_state_lock_); + LOG_TRACE(kRPC, << "RpcConnection::AsyncRpc called (connected=" << ToString(connected_) << ")"); + if (!request_over_the_wire_) { FlushPendingRequests(); } @@ -281,40 +285,53 @@ void RpcConnection::AsyncRpc( auto r = std::make_shared<Request>(engine_, method_name, req, std::move(wrapped_handler)); - pending_requests_.push_back(r); - FlushPendingRequests(); -} -void RpcConnection::AsyncRawRpc(const std::string &method_name, - const std::string &req, - std::shared_ptr<std::string> resp, - RpcCallback &&handler) { - std::lock_guard<std::mutex> state_lock(connection_state_lock_); + if (connected_ == kDisconnected) { + // Oops. The connection failed _just_ before the engine got a chance + // to send it. Register it as a failure + Status status = Status::ResourceUnavailable("RpcConnection closed before send."); + auto r_vector = std::vector<std::shared_ptr<Request> > (1, r); + assert(r_vector[0].get() != nullptr); - std::shared_ptr<RpcConnection> shared_this = shared_from_this(); - auto wrapped_handler = [shared_this, this, resp, handler]( - pbio::CodedInputStream *is, const Status &status) { - if (status.ok()) { - uint32_t size = 0; - is->ReadVarint32(&size); - auto limit = is->PushLimit(size); - is->ReadString(resp.get(), limit); - is->PopLimit(limit); + engine_->AsyncRpcCommsError(status, shared_from_this(), r_vector); + } else { + pending_requests_.push_back(r); + + if (connected_ == kConnected) { // Dont flush if we're waiting or handshaking + FlushPendingRequests(); } - handler(status); - }; + } +} - auto r = std::make_shared<Request>(engine_, method_name, req, - std::move(wrapped_handler)); - pending_requests_.push_back(r); - FlushPendingRequests(); +void RpcConnection::AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests) { + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + LOG_TRACE(kRPC, << "RpcConnection::AsyncRpc[] called; connected=" << ToString(connected_)); + + if (connected_ == kDisconnected) { + // Oops. The connection failed _just_ before the engine got a chance + // to send it. Register it as a failure + Status status = Status::ResourceUnavailable("RpcConnection closed before send."); + engine_->AsyncRpcCommsError(status, shared_from_this(), requests); + } else { + pending_requests_.reserve(pending_requests_.size() + requests.size()); + for (auto r: requests) { + pending_requests_.push_back(r); + } + if (connected_ == kConnected) { // Dont flush if we're waiting or handshaking + FlushPendingRequests(); + } + } } + void RpcConnection::PreEnqueueRequests( std::vector<std::shared_ptr<Request>> requests) { // Public method - acquire lock std::lock_guard<std::mutex> state_lock(connection_state_lock_); - assert(!connected_); + + LOG_DEBUG(kRPC, << "RpcConnection::PreEnqueueRequests called"); + + assert(connected_ == kNotYetConnected); pending_requests_.insert(pending_requests_.end(), requests.begin(), requests.end()); @@ -349,7 +366,7 @@ void RpcConnection::CommsError(const Status &status) { std::make_move_iterator(pending_requests_.end())); pending_requests_.clear(); - engine_->AsyncRpcCommsError(status, requestsToReturn); + engine_->AsyncRpcCommsError(status, shared_from_this(), requestsToReturn); } void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) { @@ -379,4 +396,15 @@ std::shared_ptr<Request> RpcConnection::RemoveFromRunningQueue(int call_id) { requests_on_fly_.erase(it); return req; } + +std::string RpcConnection::ToString(ConnectedState connected) { + switch(connected) { + case kNotYetConnected: return "NotYetConnected"; + case kConnecting: return "Connecting"; + case kConnected: return "Connected"; + case kDisconnected: return "Disconnected"; + default: return "Invalid ConnectedState"; + } +} + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f73fee/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index cab14fa..4c33a41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -34,6 +34,8 @@ template <class NextLayer> class RpcConnectionImpl : public RpcConnection { public: RpcConnectionImpl(RpcEngine *engine); + virtual ~RpcConnectionImpl() override; + virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler); virtual void ConnectAndFlush( @@ -49,7 +51,7 @@ public: NextLayer &next_layer() { return next_layer_; } - void TEST_set_connected(bool new_value) { connected_ = new_value; } + void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; } private: const Options options_; @@ -66,7 +68,19 @@ RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine) options_(engine->options()), next_layer_(engine->io_service()) { LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called"); - } +} + +template <class NextLayer> +RpcConnectionImpl<NextLayer>::~RpcConnectionImpl() { + LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this); + + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + if (pending_requests_.size() > 0) + LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue"); + if (requests_on_fly_.size() > 0) + LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); +} + template <class NextLayer> void RpcConnectionImpl<NextLayer>::Connect( @@ -93,6 +107,16 @@ void RpcConnectionImpl<NextLayer>::ConnectAndFlush( return; } + if (connected_ == kConnected) { + FlushPendingRequests(); + return; + } + if (connected_ != kNotYetConnected) { + LOG_WARN(kRPC, << "RpcConnectionImpl::ConnectAndFlush called while connected=" << ToString(connected_)); + return; + } + connected_ = kConnecting; + // Take the first endpoint, but remember the alternatives for later additional_endpoints_ = server; ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front(); @@ -169,8 +193,8 @@ void RpcConnectionImpl<NextLayer>::Handshake(RpcCallback &handler) { [handshake_packet, handler, shared_this, this]( const ::asio::error_code &ec, size_t) { Status status = ToStatus(ec); - if (status.ok()) { - connected_ = true; + if (status.ok() && connected_ == kConnecting) { + connected_ = kConnected; } handler(status); }); @@ -208,9 +232,13 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() { return; } - if (!connected_) { + if (connected_ == kDisconnected) { + LOG_WARN(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to flush a disconnected connection"); return; } + if (connected_ != kConnected) { + LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to flush a " << ToString(connected_) << " connection"); + } // Don't send if we don't need to if (request_over_the_wire_) { @@ -218,19 +246,25 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() { } std::shared_ptr<Request> req = pending_requests_.front(); + auto weak_req = std::weak_ptr<Request>(req); pending_requests_.erase(pending_requests_.begin()); std::shared_ptr<RpcConnection> shared_this = shared_from_this(); + auto weak_this = std::weak_ptr<RpcConnection>(shared_this); std::shared_ptr<std::string> payload = std::make_shared<std::string>(); req->GetPacket(payload.get()); if (!payload->empty()) { + assert(requests_on_fly_.find(req->call_id()) == requests_on_fly_.end()); requests_on_fly_[req->call_id()] = req; request_over_the_wire_ = req; req->timer().expires_from_now( std::chrono::milliseconds(options_.rpc_timeout)); - req->timer().async_wait([shared_this, this, req](const ::asio::error_code &ec) { - this->HandleRpcTimeout(req, ec); + req->timer().async_wait([weak_this, weak_req, this](const ::asio::error_code &ec) { + auto timeout_this = weak_this.lock(); + auto timeout_req = weak_req.lock(); + if (timeout_this && timeout_req) + this->HandleRpcTimeout(timeout_req, ec); }); asio::async_write(next_layer_, asio::buffer(*payload), @@ -320,11 +354,11 @@ void RpcConnectionImpl<NextLayer>::Disconnect() { LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called"); request_over_the_wire_.reset(); - if (connected_) { + if (connected_ == kConnecting || connected_ == kConnected) { next_layer_.cancel(); next_layer_.close(); } - connected_ = false; + connected_ = kDisconnected; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f73fee/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index 70b50cf..132bb69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -61,7 +61,6 @@ void RpcEngine::Shutdown() { LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called"); io_service_->post([this]() { std::lock_guard<std::mutex> state_lock(engine_state_lock_); - conn_->Disconnect(); conn_.reset(); }); } @@ -122,47 +121,33 @@ std::shared_ptr<RpcConnection> RpcEngine::InitializeConnection() return result; } - -Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req, - std::shared_ptr<std::string> resp) { - LOG_TRACE(kRPC, << "RpcEngine::RawRpc called"); - - std::shared_ptr<RpcConnection> conn; - { - std::lock_guard<std::mutex> state_lock(engine_state_lock_); - if (!conn_) { - conn_ = InitializeConnection(); - conn_->ConnectAndFlush(last_endpoints_); - } - conn = conn_; - } - - auto stat = std::make_shared<std::promise<Status>>(); - std::future<Status> future(stat->get_future()); - conn->AsyncRawRpc(method_name, req, resp, - [stat](const Status &status) { stat->set_value(status); }); - return future.get(); -} - void RpcEngine::AsyncRpcCommsError( const Status &status, + std::shared_ptr<RpcConnection> failedConnection, std::vector<std::shared_ptr<Request>> pendingRequests) { - LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called"); + LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; conn=" << failedConnection.get() << " reqs=" << pendingRequests.size()); - io_service().post([this, status, pendingRequests]() { - RpcCommsError(status, pendingRequests); + io_service().post([this, status, failedConnection, pendingRequests]() { + RpcCommsError(status, failedConnection, pendingRequests); }); } void RpcEngine::RpcCommsError( const Status &status, + std::shared_ptr<RpcConnection> failedConnection, std::vector<std::shared_ptr<Request>> pendingRequests) { (void)status; - LOG_ERROR(kRPC, << "RpcEngine::RpcCommsError called"); + LOG_ERROR(kRPC, << "RpcEngine::RpcCommsError called; conn=" << failedConnection.get() << " reqs=" << pendingRequests.size()); std::lock_guard<std::mutex> state_lock(engine_state_lock_); + // If the failed connection is the current one, shut it down + // It will be reconnected when there is work to do + if (failedConnection == conn_) { + conn_.reset(); + } + auto head_action = optional<RetryAction>(); // Filter out anything with too many retries already @@ -192,25 +177,35 @@ void RpcEngine::RpcCommsError( } } - // Close the connection and retry and requests that might have been sent to - // the NN - if (!pendingRequests.empty() && - head_action && head_action->action != RetryAction::FAIL) { - conn_ = InitializeConnection(); - - conn_->PreEnqueueRequests(pendingRequests); - if (head_action->delayMillis > 0) { - retry_timer.expires_from_now( - std::chrono::milliseconds(options_.rpc_retry_delay_ms)); - retry_timer.async_wait([this](asio::error_code ec) { - if (!ec) conn_->ConnectAndFlush(last_endpoints_); - }); + // If we have reqests that need to be re-sent, ensure that we have a connection + // and send the requests to it + bool haveRequests = !pendingRequests.empty() && + head_action && head_action->action != RetryAction::FAIL; + + if (haveRequests) { + bool needNewConnection = !conn_; + if (needNewConnection) { + conn_ = InitializeConnection(); + conn_->PreEnqueueRequests(pendingRequests); + + if (head_action->delayMillis > 0) { + auto weak_conn = std::weak_ptr<RpcConnection>(conn_); + retry_timer.expires_from_now( + std::chrono::milliseconds(options_.rpc_retry_delay_ms)); + retry_timer.async_wait([this, weak_conn](asio::error_code ec) { + auto strong_conn = weak_conn.lock(); + if ( (!ec) && (strong_conn) ) { + strong_conn->ConnectAndFlush(last_endpoints_); + } + }); + } else { + conn_->ConnectAndFlush(last_endpoints_); + } } else { - conn_->ConnectAndFlush(last_endpoints_); + // We have an existing connection (which might be closed; we don't know + // until we hold the connection local) and should just add the new requests + conn_->AsyncRpc(pendingRequests); } - } else { - // Connection will try again if someone calls AsyncRpc - conn_.reset(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f73fee/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index 7b66ac0..8ea6e8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -125,8 +125,7 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> { std::shared_ptr<::google::protobuf::MessageLite> resp, const RpcCallback &handler); - void AsyncRawRpc(const std::string &method_name, const std::string &request, - std::shared_ptr<std::string> resp, RpcCallback &&handler); + void AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests); // Enqueue requests before the connection is connected. Will be flushed // on connect @@ -182,7 +181,15 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> { // Connection can have deferred connection, especially when we're pausing // during retry - bool connected_; + enum ConnectedState { + kNotYetConnected, + kConnecting, + kConnected, + kDisconnected + }; + static std::string ToString(ConnectedState connected); + + ConnectedState connected_; // The request being sent over the wire; will also be in requests_on_fly_ std::shared_ptr<Request> request_over_the_wire_; // Requests to be sent over the wire @@ -207,6 +214,7 @@ class LockFreeRpcEngine { public: /* Enqueues a CommsError without acquiring a lock*/ virtual void AsyncRpcCommsError(const Status &status, + std::shared_ptr<RpcConnection> failedConnection, std::vector<std::shared_ptr<Request>> pendingRequests) = 0; @@ -254,19 +262,16 @@ class RpcEngine : public LockFreeRpcEngine { Status Rpc(const std::string &method_name, const ::google::protobuf::MessageLite *req, const std::shared_ptr<::google::protobuf::MessageLite> &resp); - /** - * Send raw bytes as RPC payload. This is intended to be used in JNI - * bindings only. - **/ - Status RawRpc(const std::string &method_name, const std::string &req, - std::shared_ptr<std::string> resp); + void Start(); void Shutdown(); /* Enqueues a CommsError without acquiring a lock*/ void AsyncRpcCommsError(const Status &status, + std::shared_ptr<RpcConnection> failedConnection, std::vector<std::shared_ptr<Request>> pendingRequests) override; void RpcCommsError(const Status &status, + std::shared_ptr<RpcConnection> failedConnection, std::vector<std::shared_ptr<Request>> pendingRequests); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f73fee/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc index b7d5d0b..b5f4d9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -106,8 +106,8 @@ TEST(RpcEngineTest, TestRoundTrip) { ::asio::io_service io_service; Options options; RpcEngine engine(&io_service, options, "foo", "", "protocol", 1); - RpcConnectionImpl<MockRPCConnection> *conn = - new RpcConnectionImpl<MockRPCConnection>(&engine); + auto conn = + std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine); conn->TEST_set_connected(true); conn->StartReading(); @@ -142,8 +142,8 @@ TEST(RpcEngineTest, TestConnectionResetAndFail) { ::asio::io_service io_service; Options options; RpcEngine engine(&io_service, options, "foo", "", "protocol", 1); - RpcConnectionImpl<MockRPCConnection> *conn = - new RpcConnectionImpl<MockRPCConnection>(&engine); + auto conn = + std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine); conn->TEST_set_connected(true); conn->StartReading(); @@ -436,8 +436,8 @@ TEST(RpcEngineTest, TestTimeout) { Options options; options.rpc_timeout = 1; RpcEngine engine(&io_service, options, "foo", "", "protocol", 1); - RpcConnectionImpl<MockRPCConnection> *conn = - new RpcConnectionImpl<MockRPCConnection>(&engine); + auto conn = + std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine); conn->TEST_set_connected(true); conn->StartReading();