This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new c4196fba3ae [fix][broker] Record GeoPersistentReplicator.msgOut before producer#sendAsync (#21673) c4196fba3ae is described below commit c4196fba3ae107d74f9421d3f7ed11c0c245f10f Author: Zixuan Liu <node...@gmail.com> AuthorDate: Fri Dec 8 10:32:58 2023 +0800 [fix][broker] Record GeoPersistentReplicator.msgOut before producer#sendAsync (#21673) Signed-off-by: Zixuan Liu <node...@gmail.com> --- .../pulsar/broker/service/persistent/GeoPersistentReplicator.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index 08882982297..b8287dd2c14 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -149,9 +149,6 @@ public class GeoPersistentReplicator extends PersistentReplicator { } dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength())); - - msgOut.recordEvent(headersAndPayload.readableBytes()); - msg.setReplicatedFrom(localCluster); headersAndPayload.retain(); @@ -181,6 +178,7 @@ public class GeoPersistentReplicator extends PersistentReplicator { msg.setSchemaInfoForReplicator(schemaFuture.get()); msg.getMessageBuilder().clearTxnidMostBits(); msg.getMessageBuilder().clearTxnidLeastBits(); + msgOut.recordEvent(headersAndPayload.readableBytes()); // Increment pending messages for messages produced locally PENDING_MESSAGES_UPDATER.incrementAndGet(this); producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));