This is an automated email from the ASF dual-hosted git repository.

daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c52af1d5c73 [improve][client] Print consumer stats log if prefetched 
messages are not zero (#23698)
c52af1d5c73 is described below

commit c52af1d5c733ec05b40fc72e63a202f20d25603b
Author: Penghui Li <[email protected]>
AuthorDate: Thu Dec 12 21:32:20 2024 +0800

    [improve][client] Print consumer stats log if prefetched messages are not 
zero (#23698)
---
 .../org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java     | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index 8dfc0af8e1d..5cbbcc44298 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -146,15 +146,16 @@ public class ConsumerStatsRecorderImpl implements 
ConsumerStatsRecorder {
 
                 receivedMsgsRate = currentNumMsgsReceived / elapsed;
                 receivedBytesRate = currentNumBytesReceived / elapsed;
+                int prefetchQueueSize = consumerImpl.incomingMessages.size();
                 if ((currentNumMsgsReceived | currentNumBytesReceived | 
currentNumReceiveFailed | currentNumAcksSent
-                        | currentNumAcksFailed) != 0) {
+                        | currentNumAcksFailed | prefetchQueueSize) != 0) {
                     log.info(
                             "[{}] [{}] [{}] Prefetched messages: {} --- "
                                     + "Consume throughput received: {} msgs/s 
--- {} Mbit/s --- "
                                     + "Ack sent rate: {} ack/s --- " + "Failed 
messages: {} --- batch messages: {} ---"
                                     + "Failed acks: {}",
                             consumerImpl.getTopic(), 
consumerImpl.getSubscription(), consumerImpl.consumerName,
-                            consumerImpl.incomingMessages.size(), 
THROUGHPUT_FORMAT.format(receivedMsgsRate),
+                            prefetchQueueSize, 
THROUGHPUT_FORMAT.format(receivedMsgsRate),
                             THROUGHPUT_FORMAT.format(receivedBytesRate * 8 / 
1024 / 1024),
                             THROUGHPUT_FORMAT.format(currentNumAcksSent / 
elapsed), currentNumReceiveFailed,
                             currentNumBatchReceiveFailed, 
currentNumAcksFailed);

Reply via email to