This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new c4196fba3ae [fix][broker] Record GeoPersistentReplicator.msgOut before 
producer#sendAsync (#21673)
c4196fba3ae is described below

commit c4196fba3ae107d74f9421d3f7ed11c0c245f10f
Author: Zixuan Liu <node...@gmail.com>
AuthorDate: Fri Dec 8 10:32:58 2023 +0800

    [fix][broker] Record GeoPersistentReplicator.msgOut before 
producer#sendAsync (#21673)
    
    Signed-off-by: Zixuan Liu <node...@gmail.com>
---
 .../pulsar/broker/service/persistent/GeoPersistentReplicator.java     | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
index 08882982297..b8287dd2c14 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
@@ -149,9 +149,6 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
                 }
 
                 dispatchRateLimiter.ifPresent(rateLimiter -> 
rateLimiter.tryDispatchPermit(1, entry.getLength()));
-
-                msgOut.recordEvent(headersAndPayload.readableBytes());
-
                 msg.setReplicatedFrom(localCluster);
 
                 headersAndPayload.retain();
@@ -181,6 +178,7 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
                     msg.setSchemaInfoForReplicator(schemaFuture.get());
                     msg.getMessageBuilder().clearTxnidMostBits();
                     msg.getMessageBuilder().clearTxnidLeastBits();
+                    msgOut.recordEvent(headersAndPayload.readableBytes());
                     // Increment pending messages for messages produced locally
                     PENDING_MESSAGES_UPDATER.incrementAndGet(this);
                     producer.sendAsync(msg, ProducerSendCallback.create(this, 
entry, msg));

Reply via email to