This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2538c3414d [ISSUE #9106] Fix revive backoff retry not effective in Pop
Consumption based on rocksdb (#9107)
2538c3414d is described below
commit 2538c3414d17604b930bcd52dba15edf210c4ab8
Author: Liu Shengzhong <[email protected]>
AuthorDate: Mon Jan 6 10:03:34 2025 +0800
[ISSUE #9106] Fix revive backoff retry not effective in Pop Consumption
based on rocksdb (#9107)
---
.../rocketmq/broker/pop/PopConsumerService.java | 11 ++++---
.../broker/pop/PopConsumerServiceTest.java | 36 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 4 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index fb371dce05..647e3d6ff7 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -496,10 +496,13 @@ public class PopConsumerService extends ServiceThread {
if (record.getAttemptTimes() <
brokerConfig.getPopReviveMaxAttemptTimes()) {
long backoffInterval = 1000L *
REWRITE_INTERVALS_IN_SECONDS[
Math.min(REWRITE_INTERVALS_IN_SECONDS.length,
record.getAttemptTimes())];
- record.setInvisibleTime(record.getInvisibleTime() +
backoffInterval);
- record.setAttemptTimes(record.getAttemptTimes() + 1);
- failureList.add(record);
- log.warn("PopConsumerService revive backoff retry,
record={}", record);
+ long nextInvisibleTime = record.getInvisibleTime() +
backoffInterval;
+ PopConsumerRecord retryRecord = new
PopConsumerRecord(record.getPopTime(), record.getGroupId(),
+ record.getTopicId(), record.getQueueId(),
record.getRetryFlag(), nextInvisibleTime,
+ record.getOffset(), record.getAttemptId());
+ retryRecord.setAttemptTimes(record.getAttemptTimes() +
1);
+ failureList.add(retryRecord);
+ log.warn("PopConsumerService revive backoff retry,
record={}", retryRecord);
} else {
log.error("PopConsumerService drop record, message may
be lost, record={}", record);
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
index 5e73adb1ea..b77c170c8c 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
@@ -385,6 +385,42 @@ public class PopConsumerServiceTest {
consumerService.shutdown();
}
+ @Test
+ public void reviveBackoffRetryTest() {
+
Mockito.when(brokerController.getEscapeBridge()).thenReturn(Mockito.mock(EscapeBridge.class));
+ PopConsumerService consumerServiceSpy = Mockito.spy(consumerService);
+
+ consumerService.getPopConsumerStore().start();
+
+ long popTime = 1000000000L;
+ long invisibleTime = 60 * 1000L;
+ PopConsumerRecord record = new PopConsumerRecord();
+ record.setPopTime(popTime);
+ record.setInvisibleTime(invisibleTime);
+ record.setTopicId("topic");
+ record.setGroupId("group");
+ record.setQueueId(0);
+ record.setOffset(0);
+
consumerService.getPopConsumerStore().writeRecords(Collections.singletonList(record));
+
+
Mockito.doReturn(CompletableFuture.completedFuture(Triple.of(Mockito.mock(MessageExt.class),
"", false)))
+
.when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class));
+
Mockito.when(brokerController.getEscapeBridge().putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(
+ new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))
+ );
+
+ long visibleTimestamp = popTime + invisibleTime;
+
+ // revive fails
+ Assert.assertEquals(1, consumerServiceSpy.revive(visibleTimestamp, 1));
+ // should be invisible now
+ Assert.assertEquals(0,
consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp,
1).size());
+ // will be visible again in 10 seconds
+ Assert.assertEquals(1,
consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp + 10
* 1000, 1).size());
+
+ consumerService.shutdown();
+ }
+
@Test
public void transferToFsStoreTest() {
Assert.assertNotNull(consumerService.getServiceName());