This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 16da3724cac [fix][broker] Increment topic stats outbound message 
counters after messages have been written to the TCP/IP connection (#17043)
16da3724cac is described below

commit 16da3724cacdb66dee744c2c6b2d75258d7e95ae
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Aug 11 07:12:21 2022 +0300

    [fix][broker] Increment topic stats outbound message counters after 
messages have been written to the TCP/IP connection (#17043)
    
    (cherry picked from commit 2bc933ee71421f60879c99c7888c96e95d5c9386)
---
 .../org/apache/pulsar/broker/service/Consumer.java  | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index ba6b3c3c297..2fea1a1b854 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -290,14 +290,19 @@ public class Consumer {
                    topicName, subscription, ackedCount, totalMessages, 
consumerId, avgMessagesPerEntry.get());
         }
         incrementUnackedMessages(unackedMessages);
-        msgOut.recordMultipleEvents(totalMessages, totalBytes);
-        msgOutCounter.add(totalMessages);
-        bytesOutCounter.add(totalBytes);
-        chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);
-
-
-        return cnx.getCommandSender().sendMessagesToConsumer(consumerId, 
topicName, subscription, partitionIdx,
-                entries, batchSizes, batchIndexesAcks, redeliveryTracker);
+        Future<Void> writeAndFlushPromise =
+                cnx.getCommandSender().sendMessagesToConsumer(consumerId, 
topicName, subscription, partitionIdx,
+                        entries, batchSizes, batchIndexesAcks, 
redeliveryTracker);
+        writeAndFlushPromise.addListener(status -> {
+            // only increment counters after the messages have been 
successfully written to the TCP/IP connection
+            if (status.isSuccess()) {
+                msgOut.recordMultipleEvents(totalMessages, totalBytes);
+                msgOutCounter.add(totalMessages);
+                bytesOutCounter.add(totalBytes);
+                chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 
0);
+            }
+        });
+        return writeAndFlushPromise;
     }
 
     private void incrementUnackedMessages(int ackedMessages) {

Reply via email to