This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 070132b   [improve][client] Enhance connection and timeout logging 
(#539)
070132b is described below

commit 070132b13e38d025673b053127537b53f1c4ff35
Author: zhanglistar <[email protected]>
AuthorDate: Tue Mar 3 15:05:13 2026 +0800

     [improve][client] Enhance connection and timeout logging (#539)
---
 lib/ClientConnection.cc             | 30 ++++++++++++++++++++++++++----
 lib/ConsumerImpl.cc                 |  8 ++++++++
 lib/MultiTopicsConsumerImpl.cc      |  9 +++++++++
 lib/ProducerImpl.cc                 |  9 +++++++++
 lib/UnAckedMessageTrackerEnabled.cc | 14 +++++++++++---
 5 files changed, 63 insertions(+), 7 deletions(-)

diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 1d488d8..8665a7e 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -56,6 +56,16 @@ using namespace ASIO::ip;
 
 namespace pulsar {
 
+namespace {
+static std::ostream& operator<<(std::ostream& os, const 
tcp::resolver::results_type& results) {
+    for (const auto& entry : results) {
+        const auto& ep = entry.endpoint();
+        os << ep.address().to_string() << ":" << ep.port() << " ";
+    }
+    return os;
+}
+}  // anonymous namespace
+
 using proto::BaseCommand;
 
 static const uint32_t DefaultBufferSize = 64 * 1024;
@@ -486,7 +496,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& 
err, const tcp::endp
             handleHandshake(ASIO_SUCCESS);
         }
     } else {
-        LOG_ERROR(cnxString_ << "Failed to establish connection: " << 
err.message());
+        LOG_ERROR(cnxString_ << "Failed to establish connection to " << 
endpoint << ": " << err.message());
         if (err == ASIO::error::operation_aborted) {
             close();
         } else {
@@ -603,16 +613,25 @@ void ClientConnection::handleResolve(ASIO_ERROR err, 
const tcp::resolver::result
         return;
     }
 
+    if (!results.empty()) {
+        LOG_DEBUG(cnxString_ << "Resolved " << results.size() << " endpoints");
+        for (const auto& entry : results) {
+            const auto& ep = entry.endpoint();
+            LOG_DEBUG(cnxString_ << "  " << ep.address().to_string() << ":" << 
ep.port());
+        }
+    }
+
     auto weakSelf = weak_from_this();
-    connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& 
ec) {
+    connectTimeoutTask_->setCallback([weakSelf, results = 
tcp::resolver::results_type(results)](
+                                         const PeriodicTask::ErrorCode& ec) {
         ClientConnectionPtr ptr = weakSelf.lock();
         if (!ptr) {
-            // Connection was already destroyed
+            LOG_DEBUG("Connect timeout callback skipped: connection was 
already destroyed");
             return;
         }
 
         if (ptr->state_ != Ready) {
-            LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
+            LOG_ERROR(ptr->cnxString_ << "Connection to " << results << " was 
not established in "
                                       << 
ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
             PeriodicTask::ErrorCode err;
             ptr->socket_->close(err);
@@ -1212,6 +1231,7 @@ Future<Result, ResponseData> 
ClientConnection::sendRequestWithId(const SharedBuf
 void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec,
                                             const PendingRequestData& 
pendingRequestData) {
     if (!ec && !pendingRequestData.hasGotResponse->load()) {
+        LOG_WARN(cnxString_ << "Network request timeout to broker, remote: " 
<< physicalAddress_);
         pendingRequestData.promise.setFailed(ResultTimeout);
     }
 }
@@ -1219,6 +1239,7 @@ void ClientConnection::handleRequestTimeout(const 
ASIO_ERROR& ec,
 void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec,
                                            const LookupRequestData& 
pendingRequestData) {
     if (!ec) {
+        LOG_WARN(cnxString_ << "Lookup request timeout to broker, remote: " << 
physicalAddress_);
         pendingRequestData.promise->setFailed(ResultTimeout);
     }
 }
@@ -1226,6 +1247,7 @@ void ClientConnection::handleLookupTimeout(const 
ASIO_ERROR& ec,
 void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec,
                                                      const 
ClientConnection::LastMessageIdRequestData& data) {
     if (!ec) {
+        LOG_WARN(cnxString_ << "GetLastMessageId request timeout to broker, 
remote: " << physicalAddress_);
         data.promise->setFailed(ResultTimeout);
     }
 }
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index a645f58..757b6e8 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1104,6 +1104,14 @@ Result ConsumerImpl::receiveHelper(Message& msg, int 
timeout) {
         if (state_ != Ready) {
             return ResultAlreadyClosed;
         }
+        auto cnx = getCnx().lock();
+        if (cnx) {
+            LOG_WARN(getName() << " Receive timeout after " << timeout << " 
ms, connection: "
+                               << cnx->cnxString() << ", queue size: " << 
incomingMessages_.size());
+        } else {
+            LOG_WARN(getName() << " Receive timeout after " << timeout
+                               << " ms, no connection, queue size: " << 
incomingMessages_.size());
+        }
         return ResultTimeout;
     }
 }
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 9c741fa..0799eb6 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -21,6 +21,7 @@
 #include <chrono>
 #include <stdexcept>
 
+#include "ClientConnection.h"
 #include "ClientImpl.h"
 #include "ConsumerImpl.h"
 #include "ExecutorService.h"
@@ -600,6 +601,14 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int 
timeout) {
         if (state_ != Ready) {
             return ResultAlreadyClosed;
         }
+        auto cnx = getCnx().lock();
+        if (cnx) {
+            LOG_WARN(getName() << " Receive timeout after " << timeout << " 
ms, connection: "
+                               << cnx->cnxString() << ", queue size: " << 
incomingMessages_.size());
+        } else {
+            LOG_WARN(getName() << " Receive timeout after " << timeout
+                               << " ms, no connection, queue size: " << 
incomingMessages_.size());
+        }
         return ResultTimeout;
     }
 }
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 7fd14c7..360e128 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -868,6 +868,15 @@ void ProducerImpl::handleSendTimeout(const ASIO_ERROR& 
err) {
     }
 
     lock.unlock();
+    auto cnx = getCnx().lock();
+    if (cnx) {
+        LOG_WARN(getName() << "Send timeout due to queueing delay, connection: 
" << cnx->cnxString()
+                           << ", pending messages: " << pendingMessages.size()
+                           << ", queue size: " << 
pendingMessagesQueue_.size());
+    } else {
+        LOG_WARN(getName() << "Send timeout due to queueing delay, no 
connection, pending messages: "
+                           << pendingMessages.size() << ", queue size: " << 
pendingMessagesQueue_.size());
+    }
     for (const auto& op : pendingMessages) {
         op->complete(ResultTimeout, {});
     }
diff --git a/lib/UnAckedMessageTrackerEnabled.cc 
b/lib/UnAckedMessageTrackerEnabled.cc
index e5bd3d2..1c7b20e 100644
--- a/lib/UnAckedMessageTrackerEnabled.cc
+++ b/lib/UnAckedMessageTrackerEnabled.cc
@@ -20,6 +20,7 @@
 
 #include <functional>
 
+#include "ClientConnection.h"
 #include "ClientImpl.h"
 #include "ConsumerImplBase.h"
 #include "ExecutorService.h"
@@ -57,9 +58,16 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
 
     std::set<MessageId> msgIdsToRedeliver;
     if (!headPartition.empty()) {
-        LOG_INFO(consumerReference_.getName().c_str()
-                 << ": " << headPartition.size() << " Messages were not acked 
within "
-                 << timePartitions.size() * tickDurationInMs_ << " time");
+        auto cnx = consumerReference_.getCnx().lock();
+        if (cnx) {
+            LOG_WARN(consumerReference_.getName()
+                     << " Unacked messages timeout: " << headPartition.size() 
<< " messages not acked within "
+                     << timeoutMs_ << " ms, connection: " << cnx->cnxString());
+        } else {
+            LOG_WARN(consumerReference_.getName()
+                     << " Unacked messages timeout: " << headPartition.size() 
<< " messages not acked within "
+                     << timeoutMs_ << " ms, no connection");
+        }
         for (auto it = headPartition.begin(); it != headPartition.end(); it++) 
{
             msgIdsToRedeliver.insert(*it);
             messageIdPartitionMap.erase(*it);

Reply via email to