This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push: new bf83b1b [C++] Fix some pending requests may never complete when broker's down (#8232) bf83b1b is described below commit bf83b1b29dcb77a740b27d9ab2369fdf78b1d4b4 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Mon Oct 12 15:14:39 2020 +0800 [C++] Fix some pending requests may never complete when broker's down (#8232) Fixes #8230 ### Motivation When `ClientConnection` calls `close`, some promises of pending request were not completed. It may cause some methods that wait for promise become completed never return, like `Reader::hasMessageAvailable`. ### Modifications - Complete `GetLastMessageId` and `GetNamespaceTopics` requests in `ClientConnection::close`. - Refactor `ClientConnection::close`, use move instead of swap to process some fields outside the lock section. - Add error log before connection was closed by read error. ### Verifying this change Try to reproduce #8230 , `Reader::hasMessageAvailable` would return `ResultConnectError` if broker was killed by force. (cherry picked from commit 5e60775fcf607be202641a45166c5d8781e3de87) --- pulsar-client-cpp/lib/ClientConnection.cc | 72 +++++++++++++++---------------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index ebb7268..6394d25 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -498,6 +498,9 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b incomingBuffer_.bytesWritten(bytesTransferred); if (err || bytesTransferred == 0) { + if (err) { + LOG_ERROR(cnxString_ << "Read failed: " << err.message()); + } // else: bytesTransferred == 0, which means server has closed the connection close(); } else if (bytesTransferred < minReadSize) { // Read the remaining part, use a slice of buffer to write on the next @@ -1378,28 +1381,39 @@ void ClientConnection::close() { state_ = Disconnected; boost::system::error_code err; socket_->close(err); - ConsumersMap consumers; - consumers.swap(consumers_); - ProducersMap producers; - producers.swap(producers_); - lock.unlock(); - LOG_INFO(cnxString_ << "Connection closed"); + if (tlsSocket_) { + tlsSocket_->lowest_layer().close(); + } + + if (executor_) { + executor_.reset(); + } + + // Move the internal fields to process them after `mutex_` was unlocked + auto consumers = std::move(consumers_); + auto producers = std::move(producers_); + auto pendingRequests = std::move(pendingRequests_); + auto pendingLookupRequests = std::move(pendingLookupRequests_); + auto pendingConsumerStatsMap = std::move(pendingConsumerStatsMap_); + auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_); + auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_); + + numOfPendingLookupRequest_ = 0; if (keepAliveTimer_) { - lock.lock(); keepAliveTimer_->cancel(); keepAliveTimer_.reset(); - lock.unlock(); } if (consumerStatsRequestTimer_) { - lock.lock(); consumerStatsRequestTimer_->cancel(); consumerStatsRequestTimer_.reset(); - lock.unlock(); } + lock.unlock(); + LOG_INFO(cnxString_ << "Connection closed"); + for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) { HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second); } @@ -1410,38 +1424,22 @@ void ClientConnection::close() { connectPromise_.setFailed(ResultConnectError); - // Fail all pending operations on the connection - for (PendingRequestsMap::iterator it = pendingRequests_.begin(); it != pendingRequests_.end(); ++it) { - it->second.promise.setFailed(ResultConnectError); + // Fail all pending requests, all these type are map whose value type contains the Promise object + for (auto& kv : pendingRequests) { + kv.second.promise.setFailed(ResultConnectError); } - - // Fail all pending lookup-requests on the connection - lock.lock(); - PendingLookupRequestsMap pendingLookupRequests; - pendingLookupRequests_.swap(pendingLookupRequests); - numOfPendingLookupRequest_ -= pendingLookupRequests.size(); - - PendingConsumerStatsMap pendingConsumerStatsMap; - pendingConsumerStatsMap_.swap(pendingConsumerStatsMap); - lock.unlock(); - - for (PendingLookupRequestsMap::iterator it = pendingLookupRequests.begin(); - it != pendingLookupRequests.end(); ++it) { - it->second.promise->setFailed(ResultConnectError); + for (auto& kv : pendingLookupRequests) { + kv.second.promise->setFailed(ResultConnectError); } - - for (PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap.begin(); - it != pendingConsumerStatsMap.end(); ++it) { + for (auto& kv : pendingConsumerStatsMap) { LOG_ERROR(cnxString_ << " Closing Client Connection, please try again later"); - it->second.setFailed(ResultConnectError); + kv.second.setFailed(ResultConnectError); } - - if (tlsSocket_) { - tlsSocket_->lowest_layer().close(); + for (auto& kv : pendingGetLastMessageIdRequests) { + kv.second.setFailed(ResultConnectError); } - - if (executor_) { - executor_.reset(); + for (auto& kv : pendingGetNamespaceTopicsRequests) { + kv.second.setFailed(ResultConnectError); } }