This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 343ed4ff46 [ISSUE #8127] Optimize the metric calculation logic of the
time wheel (#8128)
343ed4ff46 is described below
commit 343ed4ff468debed8cbb766af0d0906b697facc4
Author: hqbfz <[email protected]>
AuthorDate: Tue Mar 4 15:53:28 2025 +0800
[ISSUE #8127] Optimize the metric calculation logic of the time wheel
(#8128)
* Fix the metric of the time wheel was incorrectly calculated
* Fix the metric of the time wheel was incorrectly calculated
---------
Co-authored-by: wanghuaiyuan <[email protected]>
---
.../rocketmq/store/timer/TimerMessageStore.java | 23 ++++++++++++++++++----
1 file changed, 19 insertions(+), 4 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 4287ce78ab..d6af7b84e7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -1571,6 +1571,9 @@ public class TimerMessageStore {
public void run() {
setState(AbstractStateService.START);
TimerMessageStore.LOGGER.info(this.getServiceName() + " service
start");
+ //Mark different rounds
+ boolean isRound = true;
+ Map<String ,MessageExt> avoidDeleteLose = new HashMap<>();
while (!this.isStopped()) {
try {
setState(AbstractStateService.WAITING);
@@ -1587,9 +1590,18 @@ public class TimerMessageStore {
MessageExt msgExt =
getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy());
if (null != msgExt) {
if (needDelete(tr.getMagic()) &&
!needRoll(tr.getMagic())) {
+ //Clearing is performed once in each round.
+ //The deletion message is received first
and the common message is received once
+ if (!isRound) {
+ isRound = true;
+ for (MessageExt messageExt:
avoidDeleteLose.values()) {
+ addMetric(messageExt, 1);
+ }
+ avoidDeleteLose.clear();
+ }
if
(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY) != null &&
tr.getDeleteList() != null) {
- //Execute metric plus one for messages
that fail to be deleted
- addMetric(msgExt, 1);
+
+
avoidDeleteLose.put(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY),
msgExt);
tr.getDeleteList().add(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY));
}
tr.idempotentRelease();
@@ -1599,10 +1611,13 @@ public class TimerMessageStore {
if (null == uniqueKey) {
LOGGER.warn("No uniqueKey for msg:{}",
msgExt);
}
+ //Mark ready for next round
+ if (isRound) {
+ isRound = false;
+ }
if (null != uniqueKey &&
tr.getDeleteList() != null && tr.getDeleteList().size() > 0
&&
tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey))) {
- //Normally, it cancels out with the +1
above
- addMetric(msgExt, -1);
+ avoidDeleteLose.remove(uniqueKey);
doRes = true;
tr.idempotentRelease();
perfCounterTicks.getCounter("dequeue_delete").flow(1);