This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b936f46d2dd [improve][broker] Optimize message expiration rate
repeated update issues (#24073)
b936f46d2dd is described below
commit b936f46d2dd2ade98521b7825ee9277549703ef1
Author: zhenJiangWang <[email protected]>
AuthorDate: Thu Mar 20 02:15:18 2025 +0800
[improve][broker] Optimize message expiration rate repeated update issues
(#24073)
Co-authored-by: zjxxzjwang <[email protected]>
---
.../broker/service/persistent/PersistentMessageExpiryMonitor.java | 4 ++--
.../apache/pulsar/broker/service/persistent/PersistentReplicator.java | 1 -
.../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 1 -
3 files changed, 2 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 3b4bc9d8bce..4cd696b0eb7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -170,11 +170,12 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback, Messag
}
- public void updateRates() {
+ private void updateRates() {
msgExpired.calculateRate();
}
public double getMessageExpiryRate() {
+ updateRates();
return msgExpired.getRate();
}
@@ -190,7 +191,6 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback, Messag
long numMessagesExpired = (long) ctx -
cursor.getNumberOfEntriesInBacklog(false);
msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value
stats */);
totalMsgExpired.add(numMessagesExpired);
- updateRates();
// If the subscription is a Key_Shared subscription, we should to
trigger message dispatch.
if (subscription != null && subscription.getType() ==
SubType.Key_Shared) {
subscription.getDispatcher().markDeletePositionMoveForward();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 881b1b804e8..1b52d5dee67 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -639,7 +639,6 @@ public abstract class PersistentReplicator extends
AbstractReplicator
public void updateRates() {
msgOut.calculateRate();
msgExpired.calculateRate();
- expiryMonitor.updateRates();
stats.msgRateOut = msgOut.getRate();
stats.msgThroughputOut = msgOut.getValueRate();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 704cf2b393c..43908329fc0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2457,7 +2457,6 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
// Populate subscription specific stats here
topicStatsStream.writePair("msgBacklog",
subscription.getNumberOfEntriesInBacklog(true));
- subscription.getExpiryMonitor().updateRates();
topicStatsStream.writePair("msgRateExpired",
subscription.getExpiredMessageRate());
topicStatsStream.writePair("msgRateOut", subMsgRateOut);
topicStatsStream.writePair("messageAckRate", subMsgAckRate);