This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3a2d674e0b31d5488a5c5183ca54c6701543741b Author: zhenJiangWang <[email protected]> AuthorDate: Fri May 23 17:38:34 2025 +0800 [fix][broker] Resolve the issue of frequent updates in message expiration deletion rate (#24190) Co-authored-by: zjxxzjwang <[email protected]> (cherry picked from commit 81c94c8a912cf08ac3f561a2aab3fa910d781db8) --- .../broker/service/persistent/PersistentMessageExpiryMonitor.java | 3 +-- .../apache/pulsar/broker/service/persistent/PersistentReplicator.java | 1 + .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 1 + .../java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java | 4 ++++ 4 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index f8d76e2dedd..7a18f4abe47 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -173,12 +173,11 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag } - private void updateRates() { + public void updateRates() { msgExpired.calculateRate(); } public double getMessageExpiryRate() { - updateRates(); return msgExpired.getRate(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 9f4cb8a1e5f..0afc4cd3ce8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -594,6 +594,7 @@ public abstract class PersistentReplicator extends AbstractReplicator public void updateRates() { msgOut.calculateRate(); msgExpired.calculateRate(); + expiryMonitor.updateRates(); stats.msgRateOut = msgOut.getRate(); stats.msgThroughputOut = msgOut.getValueRate(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index d8c2eadd039..8685f92138f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2572,6 +2572,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal // Populate subscription specific stats here topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(true)); + subscription.getExpiryMonitor().updateRates(); topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate()); topicStatsStream.writePair("msgRateOut", subMsgRateOut); topicStatsStream.writePair("messageAckRate", subMsgAckRate); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index cf590a2bb3f..2fd9cadd192 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -806,6 +806,10 @@ public class PrometheusMetricsTest extends BrokerTestBase { pulsar.getBrokerService().getTopicIfExists(topic1).get().get().getSubscription(subName); PersistentSubscription sub2 = (PersistentSubscription) pulsar.getBrokerService().getTopicIfExists(topic2).get().get().getSubscription(subName); + Awaitility.await().until(() -> sub.getExpiryMonitor().getTotalMessageExpired() != 0); + Awaitility.await().until(() -> sub2.getExpiryMonitor().getTotalMessageExpired() != 0); + sub.getExpiryMonitor().updateRates(); + sub2.getExpiryMonitor().updateRates(); Awaitility.await().until(() -> sub.getExpiredMessageRate() != 0.0); Awaitility.await().until(() -> sub2.getExpiredMessageRate() != 0.0);
