This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 070132b [improve][client] Enhance connection and timeout logging
(#539)
070132b is described below
commit 070132b13e38d025673b053127537b53f1c4ff35
Author: zhanglistar <[email protected]>
AuthorDate: Tue Mar 3 15:05:13 2026 +0800
[improve][client] Enhance connection and timeout logging (#539)
---
lib/ClientConnection.cc | 30 ++++++++++++++++++++++++++----
lib/ConsumerImpl.cc | 8 ++++++++
lib/MultiTopicsConsumerImpl.cc | 9 +++++++++
lib/ProducerImpl.cc | 9 +++++++++
lib/UnAckedMessageTrackerEnabled.cc | 14 +++++++++++---
5 files changed, 63 insertions(+), 7 deletions(-)
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 1d488d8..8665a7e 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -56,6 +56,16 @@ using namespace ASIO::ip;
namespace pulsar {
+namespace {
+static std::ostream& operator<<(std::ostream& os, const
tcp::resolver::results_type& results) {
+ for (const auto& entry : results) {
+ const auto& ep = entry.endpoint();
+ os << ep.address().to_string() << ":" << ep.port() << " ";
+ }
+ return os;
+}
+} // anonymous namespace
+
using proto::BaseCommand;
static const uint32_t DefaultBufferSize = 64 * 1024;
@@ -486,7 +496,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR&
err, const tcp::endp
handleHandshake(ASIO_SUCCESS);
}
} else {
- LOG_ERROR(cnxString_ << "Failed to establish connection: " <<
err.message());
+ LOG_ERROR(cnxString_ << "Failed to establish connection to " <<
endpoint << ": " << err.message());
if (err == ASIO::error::operation_aborted) {
close();
} else {
@@ -603,16 +613,25 @@ void ClientConnection::handleResolve(ASIO_ERROR err,
const tcp::resolver::result
return;
}
+ if (!results.empty()) {
+ LOG_DEBUG(cnxString_ << "Resolved " << results.size() << " endpoints");
+ for (const auto& entry : results) {
+ const auto& ep = entry.endpoint();
+ LOG_DEBUG(cnxString_ << " " << ep.address().to_string() << ":" <<
ep.port());
+ }
+ }
+
auto weakSelf = weak_from_this();
- connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode&
ec) {
+ connectTimeoutTask_->setCallback([weakSelf, results =
tcp::resolver::results_type(results)](
+ const PeriodicTask::ErrorCode& ec) {
ClientConnectionPtr ptr = weakSelf.lock();
if (!ptr) {
- // Connection was already destroyed
+ LOG_DEBUG("Connect timeout callback skipped: connection was
already destroyed");
return;
}
if (ptr->state_ != Ready) {
- LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
+ LOG_ERROR(ptr->cnxString_ << "Connection to " << results << " was
not established in "
<<
ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
PeriodicTask::ErrorCode err;
ptr->socket_->close(err);
@@ -1212,6 +1231,7 @@ Future<Result, ResponseData>
ClientConnection::sendRequestWithId(const SharedBuf
void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec,
const PendingRequestData&
pendingRequestData) {
if (!ec && !pendingRequestData.hasGotResponse->load()) {
+ LOG_WARN(cnxString_ << "Network request timeout to broker, remote: "
<< physicalAddress_);
pendingRequestData.promise.setFailed(ResultTimeout);
}
}
@@ -1219,6 +1239,7 @@ void ClientConnection::handleRequestTimeout(const
ASIO_ERROR& ec,
void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec,
const LookupRequestData&
pendingRequestData) {
if (!ec) {
+ LOG_WARN(cnxString_ << "Lookup request timeout to broker, remote: " <<
physicalAddress_);
pendingRequestData.promise->setFailed(ResultTimeout);
}
}
@@ -1226,6 +1247,7 @@ void ClientConnection::handleLookupTimeout(const
ASIO_ERROR& ec,
void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec,
const
ClientConnection::LastMessageIdRequestData& data) {
if (!ec) {
+ LOG_WARN(cnxString_ << "GetLastMessageId request timeout to broker,
remote: " << physicalAddress_);
data.promise->setFailed(ResultTimeout);
}
}
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index a645f58..757b6e8 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1104,6 +1104,14 @@ Result ConsumerImpl::receiveHelper(Message& msg, int
timeout) {
if (state_ != Ready) {
return ResultAlreadyClosed;
}
+ auto cnx = getCnx().lock();
+ if (cnx) {
+ LOG_WARN(getName() << " Receive timeout after " << timeout << "
ms, connection: "
+ << cnx->cnxString() << ", queue size: " <<
incomingMessages_.size());
+ } else {
+ LOG_WARN(getName() << " Receive timeout after " << timeout
+ << " ms, no connection, queue size: " <<
incomingMessages_.size());
+ }
return ResultTimeout;
}
}
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 9c741fa..0799eb6 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -21,6 +21,7 @@
#include <chrono>
#include <stdexcept>
+#include "ClientConnection.h"
#include "ClientImpl.h"
#include "ConsumerImpl.h"
#include "ExecutorService.h"
@@ -600,6 +601,14 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int
timeout) {
if (state_ != Ready) {
return ResultAlreadyClosed;
}
+ auto cnx = getCnx().lock();
+ if (cnx) {
+ LOG_WARN(getName() << " Receive timeout after " << timeout << "
ms, connection: "
+ << cnx->cnxString() << ", queue size: " <<
incomingMessages_.size());
+ } else {
+ LOG_WARN(getName() << " Receive timeout after " << timeout
+ << " ms, no connection, queue size: " <<
incomingMessages_.size());
+ }
return ResultTimeout;
}
}
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 7fd14c7..360e128 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -868,6 +868,15 @@ void ProducerImpl::handleSendTimeout(const ASIO_ERROR&
err) {
}
lock.unlock();
+ auto cnx = getCnx().lock();
+ if (cnx) {
+ LOG_WARN(getName() << "Send timeout due to queueing delay, connection:
" << cnx->cnxString()
+ << ", pending messages: " << pendingMessages.size()
+ << ", queue size: " <<
pendingMessagesQueue_.size());
+ } else {
+ LOG_WARN(getName() << "Send timeout due to queueing delay, no
connection, pending messages: "
+ << pendingMessages.size() << ", queue size: " <<
pendingMessagesQueue_.size());
+ }
for (const auto& op : pendingMessages) {
op->complete(ResultTimeout, {});
}
diff --git a/lib/UnAckedMessageTrackerEnabled.cc
b/lib/UnAckedMessageTrackerEnabled.cc
index e5bd3d2..1c7b20e 100644
--- a/lib/UnAckedMessageTrackerEnabled.cc
+++ b/lib/UnAckedMessageTrackerEnabled.cc
@@ -20,6 +20,7 @@
#include <functional>
+#include "ClientConnection.h"
#include "ClientImpl.h"
#include "ConsumerImplBase.h"
#include "ExecutorService.h"
@@ -57,9 +58,16 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
std::set<MessageId> msgIdsToRedeliver;
if (!headPartition.empty()) {
- LOG_INFO(consumerReference_.getName().c_str()
- << ": " << headPartition.size() << " Messages were not acked
within "
- << timePartitions.size() * tickDurationInMs_ << " time");
+ auto cnx = consumerReference_.getCnx().lock();
+ if (cnx) {
+ LOG_WARN(consumerReference_.getName()
+ << " Unacked messages timeout: " << headPartition.size()
<< " messages not acked within "
+ << timeoutMs_ << " ms, connection: " << cnx->cnxString());
+ } else {
+ LOG_WARN(consumerReference_.getName()
+ << " Unacked messages timeout: " << headPartition.size()
<< " messages not acked within "
+ << timeoutMs_ << " ms, no connection");
+ }
for (auto it = headPartition.begin(); it != headPartition.end(); it++)
{
msgIdsToRedeliver.insert(*it);
messageIdPartitionMap.erase(*it);