This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4aa6cdd99e [ISSUE #10019] Revert "[ISSUE #8127]Optimize the metric
calculation logic of the time wheel" (#10020)
4aa6cdd99e is described below
commit 4aa6cdd99ec25da020d7ffb51883184b46292d52
Author: rongtong <[email protected]>
AuthorDate: Wed Jan 21 14:15:00 2026 +0800
[ISSUE #10019] Revert "[ISSUE #8127]Optimize the metric calculation logic
of the time wheel" (#10020)
* Revert "[ISSUE #8127]Optimize the metric calculation logic of the time
wheel"
* Revert "[ISSUE #8127]Optimize the metric calculation logic of the time
wheel"
* Revert "[ISSUE #8127]Optimize the metric calculation logic of the time
wheel"
---------
Co-authored-by: RongtongJin <[email protected]>
---
.../rocketmq/store/timer/TimerMessageStore.java | 26 +++++-----------------
1 file changed, 5 insertions(+), 21 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index cd5e9f4480..390dec9f98 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -1699,9 +1699,6 @@ public class TimerMessageStore {
public void run() {
setState(AbstractStateService.START);
TimerMessageStore.LOGGER.info(this.getServiceName() + " service
start");
- //Mark different rounds
- boolean isRound = true;
- Map<String, MessageExt> avoidDeleteLose = new HashMap<>();
while (!this.isStopped()) {
try {
setState(AbstractStateService.WAITING);
@@ -1718,18 +1715,9 @@ public class TimerMessageStore {
MessageExt msgExt =
getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy());
if (null != msgExt) {
if (needDelete(tr.getMagic()) &&
!needRoll(tr.getMagic())) {
- //Clearing is performed once in each round.
- //The deletion message is received first
and the common message is received once
- if (!isRound) {
- isRound = true;
- for (MessageExt messageExt :
avoidDeleteLose.values()) {
- addMetric(messageExt, 1);
- }
- avoidDeleteLose.clear();
- }
if
(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY) != null &&
tr.getDeleteList() != null) {
-
-
avoidDeleteLose.put(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY),
msgExt);
+ //Execute metric plus one for messages
that fail to be deleted
+ addMetric(msgExt, 1);
tr.getDeleteList().add(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY));
}
tr.idempotentRelease();
@@ -1739,13 +1727,9 @@ public class TimerMessageStore {
if (null == uniqueKey) {
LOGGER.warn("No uniqueKey for msg:{}",
msgExt);
}
- //Mark ready for next round
- if (isRound) {
- isRound = false;
- }
- if (null != uniqueKey &&
tr.getDeleteList() != null && tr.getDeleteList().size() > 0
- &&
tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey,
storeConfig.isAppendTopicForTimerDeleteKey()))) {
- avoidDeleteLose.remove(uniqueKey);
+ if (null != uniqueKey &&
tr.getDeleteList() != null && tr.getDeleteList().size() > 0 &&
tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey,
storeConfig.isAppendTopicForTimerDeleteKey()))) {
+ //Normally, it cancels out with the +1
above
+ addMetric(msgExt, -1);
doRes = true;
tr.idempotentRelease();
perfCounterTicks.getCounter("dequeue_delete").flow(1);