This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f80753f0da [ISSUE #10050] Support ChangeInvisibleTime without
incrementing message reconsume times (#10051)
f80753f0da is described below
commit f80753f0dad40c383f6de69fd157fb1c79c934c6
Author: ltamber <[email protected]>
AuthorDate: Mon Feb 2 14:06:57 2026 +0800
[ISSUE #10050] Support ChangeInvisibleTime without incrementing message
reconsume times (#10051)
---
.../rocketmq/broker/pop/PopConsumerRecord.java | 18 ++
.../rocketmq/broker/pop/PopConsumerService.java | 16 +-
.../processor/ChangeInvisibleTimeProcessor.java | 3 +-
.../broker/processor/PopReviveService.java | 6 +-
.../rocketmq/broker/pop/PopConsumerRecordTest.java | 185 ++++++++++++++
.../broker/pop/PopConsumerServiceTest.java | 277 ++++++++++++++++++++-
.../ChangeInvisibleTimeProcessorTest.java | 253 +++++++++++++++++++
.../ReceiveMessageResponseStreamWriter.java | 5 +-
.../proxy/processor/ConsumerProcessor.java | 17 +-
.../proxy/processor/DefaultMessagingProcessor.java | 11 +-
.../proxy/processor/MessagingProcessor.java | 13 +-
.../proxy/processor/PopMessageResultFilter.java | 3 +-
.../v2/consumer/ReceiveMessageActivityTest.java | 10 +-
.../ReceiveMessageResponseStreamWriterTest.java | 61 ++++-
.../proxy/processor/ConsumerProcessorTest.java | 106 +++++++-
.../header/ChangeInvisibleTimeRequestHeader.java | 11 +
.../apache/rocketmq/store/pop/PopCheckPoint.java | 12 +-
17 files changed, 975 insertions(+), 32 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
index 661ace9bcb..d10b584ef6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
@@ -70,12 +70,20 @@ public class PopConsumerRecord {
@JSONField(ordinal = 8)
private String attemptId;
+ @JSONField(ordinal = 9)
+ private boolean suspend;
+
// used for test and fastjson
public PopConsumerRecord() {
}
public PopConsumerRecord(long popTime, String groupId, String topicId, int
queueId,
int retryFlag, long invisibleTime, long offset, String attemptId) {
+ this(popTime, groupId, topicId, queueId, retryFlag, invisibleTime,
offset, attemptId, false);
+ }
+
+ public PopConsumerRecord(long popTime, String groupId, String topicId, int
queueId, int retryFlag,
+ long invisibleTime, long offset, String
attemptId, boolean suspend) {
this.popTime = popTime;
this.groupId = groupId;
@@ -85,6 +93,7 @@ public class PopConsumerRecord {
this.invisibleTime = invisibleTime;
this.offset = offset;
this.attemptId = attemptId;
+ this.suspend = suspend;
}
@JSONField(serialize = false)
@@ -194,6 +203,14 @@ public class PopConsumerRecord {
this.attemptId = attemptId;
}
+ public boolean isSuspend() {
+ return suspend;
+ }
+
+ public void setSuspend(boolean suspend) {
+ this.suspend = suspend;
+ }
+
@Override
public String toString() {
return "PopDeliveryRecord{" +
@@ -206,6 +223,7 @@ public class PopConsumerRecord {
", offset=" + offset +
", attemptTimes=" + attemptTimes +
", attemptId='" + attemptId + '\'' +
+ ", suspend=" + suspend +
'}';
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index a1356c2847..d76651643d 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -486,8 +486,9 @@ public class PopConsumerService extends ServiceThread {
}
// refer ChangeInvisibleTimeProcessor.appendCheckPointThenAckOrigin
- public void changeInvisibilityDuration(long popTime, long invisibleTime,
- long changedPopTime, long changedInvisibleTime, String groupId, String
topicId, int queueId, long offset) {
+ public void changeInvisibilityDuration(long popTime, long invisibleTime,
long changedPopTime,
+ long changedInvisibleTime, String
groupId, String topicId,
+ int queueId, long offset, boolean
suspend) {
if (brokerConfig.isPopConsumerKVServiceLog()) {
log.info("PopConsumerService change, time={}, invisible={}, " +
@@ -496,10 +497,10 @@ public class PopConsumerService extends ServiceThread {
}
PopConsumerRecord ckRecord = new PopConsumerRecord(
- changedPopTime, groupId, topicId, queueId, 0,
changedInvisibleTime, offset, null);
+ changedPopTime, groupId, topicId, queueId, 0,
changedInvisibleTime, offset, null, suspend);
PopConsumerRecord ackRecord = new PopConsumerRecord(
- popTime, groupId, topicId, queueId, 0, invisibleTime, offset,
null);
+ popTime, groupId, topicId, queueId, 0, invisibleTime, offset,
null, suspend);
// No need to generate new records when the group does not exist,
// because these retry messages will not be consumed by anyone.
@@ -689,7 +690,12 @@ public class PopConsumerService extends ServiceThread {
msgInner.setSysFlag(messageExt.getSysFlag());
msgInner.setBornHost(brokerController.getStoreHost());
msgInner.setStoreHost(brokerController.getStoreHost());
- msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
+ if (record.isSuspend()) {
+ msgInner.setReconsumeTimes(messageExt.getReconsumeTimes());
+ } else {
+ msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
+ }
+
msgInner.getProperties().putAll(messageExt.getProperties());
// set first pop time here
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index 133e13ccb2..a8b01ceed2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -153,7 +153,7 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
brokerController.getPopConsumerService().changeInvisibilityDuration(
ExtraInfoUtil.getPopTime(extraInfo),
ExtraInfoUtil.getInvisibleTime(extraInfo), current,
requestHeader.getInvisibleTime(),
requestHeader.getConsumerGroup(), requestHeader.getTopic(),
- requestHeader.getQueueId(), requestHeader.getOffset());
+ requestHeader.getQueueId(), requestHeader.getOffset(),
requestHeader.isSuspend());
responseHeader.setInvisibleTime(requestHeader.getInvisibleTime());
responseHeader.setPopTime(current);
responseHeader.setReviveQid(ExtraInfoUtil.getReviveQid(extraInfo));
@@ -324,6 +324,7 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
ck.setQueueId(queueId);
ck.addDiff(0);
ck.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));
+ ck.setSuspend(requestHeader.isSuspend());
msgInner.setBody(JSON.toJSONString(ck).getBytes(StandardCharsets.UTF_8));
msgInner.setQueueId(reviveQid);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index e88879d9c6..07f16e9896 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -122,7 +122,11 @@ public class PopReviveService extends ServiceThread {
msgInner.setSysFlag(messageExt.getSysFlag());
msgInner.setBornHost(brokerController.getStoreHost());
msgInner.setStoreHost(brokerController.getStoreHost());
- msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
+ if (popCheckPoint.isSuspend()) {
+ msgInner.setReconsumeTimes(messageExt.getReconsumeTimes());
+ } else {
+ msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
+ }
msgInner.getProperties().putAll(messageExt.getProperties());
if (messageExt.getReconsumeTimes() == 0 ||
msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME,
String.valueOf(popCheckPoint.getPopTime()));
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRecordTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRecordTest.java
index 24a79b33f3..b1a1a700c5 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRecordTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRecordTest.java
@@ -72,4 +72,189 @@ public class PopConsumerRecordTest {
Assert.assertEquals(0, consumerRecord2.getAttemptTimes());
Assert.assertEquals(decodeRecord.getAttemptId(),
consumerRecord2.getAttemptId());
}
+
+ @Test
+ public void testSuspendFlagInitialization() {
+ // Test constructor without suspend flag (should default to false)
+ PopConsumerRecord record1 = new PopConsumerRecord(
+ System.currentTimeMillis(), "test-group", "test-topic", 0, 0,
30000L, 100L, "attempt-id");
+ Assert.assertFalse("Suspend flag should default to false",
record1.isSuspend());
+
+ // Test constructor with suspend flag set to true
+ PopConsumerRecord record2 = new PopConsumerRecord(
+ System.currentTimeMillis(), "test-group", "test-topic", 0, 0,
30000L, 100L, "attempt-id", true);
+ Assert.assertTrue("Suspend flag should be true", record2.isSuspend());
+
+ // Test constructor with suspend flag set to false
+ PopConsumerRecord record3 = new PopConsumerRecord(
+ System.currentTimeMillis(), "test-group", "test-topic", 0, 0,
30000L, 100L, "attempt-id", false);
+ Assert.assertFalse("Suspend flag should be false",
record3.isSuspend());
+ }
+
+ @Test
+ public void testSuspendFlagSerialization() {
+ // Test serialization/deserialization with suspend flag
+ PopConsumerRecord originalRecord = new PopConsumerRecord(
+ 1234567890L, "test-group", "test-topic", 0, 0, 30000L, 100L,
"attempt-id", true);
+
+ byte[] serialized = originalRecord.getValueBytes();
+ PopConsumerRecord deserialized = PopConsumerRecord.decode(serialized);
+
+ Assert.assertTrue("Deserialized record should have suspend flag true",
deserialized.isSuspend());
+ Assert.assertEquals("Other fields should match",
originalRecord.getGroupId(), deserialized.getGroupId());
+ Assert.assertEquals("Other fields should match",
originalRecord.getTopicId(), deserialized.getTopicId());
+ Assert.assertEquals("Other fields should match",
originalRecord.getOffset(), deserialized.getOffset());
+ }
+
+ @Test
+ public void testSuspendFlagGetterSetter() {
+ PopConsumerRecord record = new PopConsumerRecord();
+
+ // Test initial value
+ Assert.assertFalse("Initial suspend value should be false",
record.isSuspend());
+
+ // Test setter
+ record.setSuspend(true);
+ Assert.assertTrue("After setting to true, should be true",
record.isSuspend());
+
+ record.setSuspend(false);
+ Assert.assertFalse("After setting to false, should be false",
record.isSuspend());
+ }
+
+ @Test
+ public void testSuspendInToString() {
+ PopConsumerRecord record = new PopConsumerRecord(
+ 1234567890L, "test-group", "test-topic", 0, 0, 30000L, 100L,
"attempt-id", true);
+
+ String toString = record.toString();
+ Assert.assertTrue("toString should include suspend information",
toString.contains("suspend=true"));
+
+ PopConsumerRecord record2 = new PopConsumerRecord(
+ 1234567890L, "test-group", "test-topic", 0, 0, 30000L, 100L,
"attempt-id", false);
+
+ String toString2 = record2.toString();
+ Assert.assertTrue("toString should include suspend information",
toString2.contains("suspend=false"));
+ }
+
+ @Test
+ public void testSuspendFlagSerializationWithFalse() {
+ // Test serialization/deserialization with suspend flag set to false
+ PopConsumerRecord originalRecord = new PopConsumerRecord(
+ 1234567890L, "test-group", "test-topic", 0, 0, 30000L, 100L,
"attempt-id", false);
+
+ byte[] serialized = originalRecord.getValueBytes();
+ PopConsumerRecord deserialized = PopConsumerRecord.decode(serialized);
+
+ Assert.assertFalse("Deserialized record should have suspend flag
false", deserialized.isSuspend());
+ Assert.assertEquals("GroupId should match",
originalRecord.getGroupId(), deserialized.getGroupId());
+ Assert.assertEquals("TopicId should match",
originalRecord.getTopicId(), deserialized.getTopicId());
+ Assert.assertEquals("Offset should match", originalRecord.getOffset(),
deserialized.getOffset());
+ Assert.assertEquals("PopTime should match",
originalRecord.getPopTime(), deserialized.getPopTime());
+ Assert.assertEquals("QueueId should match",
originalRecord.getQueueId(), deserialized.getQueueId());
+ Assert.assertEquals("InvisibleTime should match",
originalRecord.getInvisibleTime(), deserialized.getInvisibleTime());
+ Assert.assertEquals("RetryFlag should match",
originalRecord.getRetryFlag(), deserialized.getRetryFlag());
+ Assert.assertEquals("AttemptId should match",
originalRecord.getAttemptId(), deserialized.getAttemptId());
+ }
+
+ @Test
+ public void testSuspendFlagJSONSerializationCompleteness() {
+ // Test complete serialization/deserialization with all fields
including suspend
+ long popTime = System.currentTimeMillis();
+ String groupId = "test-group";
+ String topicId = "test-topic";
+ int queueId = 1;
+ int retryFlag = PopConsumerRecord.RetryType.RETRY_TOPIC_V2.getCode();
+ long invisibleTime = 30000L;
+ long offset = 100L;
+ String attemptId = UUID.randomUUID().toString().toUpperCase();
+
+ // Test with suspend = true
+ PopConsumerRecord recordWithSuspend = new PopConsumerRecord(
+ popTime, groupId, topicId, queueId, retryFlag, invisibleTime,
offset, attemptId, true);
+ recordWithSuspend.setAttemptTimes(3);
+
+ byte[] serialized = recordWithSuspend.getValueBytes();
+ PopConsumerRecord deserialized = PopConsumerRecord.decode(serialized);
+
+ Assert.assertTrue("Suspend flag should be true",
deserialized.isSuspend());
+ Assert.assertEquals("PopTime should match", popTime,
deserialized.getPopTime());
+ Assert.assertEquals("GroupId should match", groupId,
deserialized.getGroupId());
+ Assert.assertEquals("TopicId should match", topicId,
deserialized.getTopicId());
+ Assert.assertEquals("QueueId should match", queueId,
deserialized.getQueueId());
+ Assert.assertEquals("RetryFlag should match", retryFlag,
deserialized.getRetryFlag());
+ Assert.assertEquals("InvisibleTime should match", invisibleTime,
deserialized.getInvisibleTime());
+ Assert.assertEquals("Offset should match", offset,
deserialized.getOffset());
+ Assert.assertEquals("AttemptTimes should match", 3,
deserialized.getAttemptTimes());
+ Assert.assertEquals("AttemptId should match", attemptId,
deserialized.getAttemptId());
+
+ // Test with suspend = false
+ PopConsumerRecord recordWithoutSuspend = new PopConsumerRecord(
+ popTime, groupId, topicId, queueId, retryFlag, invisibleTime,
offset, attemptId, false);
+ recordWithoutSuspend.setAttemptTimes(3);
+
+ serialized = recordWithoutSuspend.getValueBytes();
+ deserialized = PopConsumerRecord.decode(serialized);
+
+ Assert.assertFalse("Suspend flag should be false",
deserialized.isSuspend());
+ Assert.assertEquals("PopTime should match", popTime,
deserialized.getPopTime());
+ Assert.assertEquals("GroupId should match", groupId,
deserialized.getGroupId());
+ Assert.assertEquals("TopicId should match", topicId,
deserialized.getTopicId());
+ Assert.assertEquals("QueueId should match", queueId,
deserialized.getQueueId());
+ Assert.assertEquals("RetryFlag should match", retryFlag,
deserialized.getRetryFlag());
+ Assert.assertEquals("InvisibleTime should match", invisibleTime,
deserialized.getInvisibleTime());
+ Assert.assertEquals("Offset should match", offset,
deserialized.getOffset());
+ Assert.assertEquals("AttemptTimes should match", 3,
deserialized.getAttemptTimes());
+ Assert.assertEquals("AttemptId should match", attemptId,
deserialized.getAttemptId());
+ }
+
+ @Test
+ public void testSuspendFlagDefaultValueInNoArgConstructor() {
+ // Test that no-arg constructor defaults suspend to false
+ PopConsumerRecord record = new PopConsumerRecord();
+ Assert.assertFalse("No-arg constructor should default suspend to
false", record.isSuspend());
+
+ // Set all fields manually
+ record.setPopTime(System.currentTimeMillis());
+ record.setGroupId("test-group");
+ record.setTopicId("test-topic");
+ record.setQueueId(0);
+ record.setRetryFlag(0);
+ record.setInvisibleTime(30000L);
+ record.setOffset(100L);
+ record.setAttemptId("attempt-id");
+ record.setSuspend(true);
+
+ Assert.assertTrue("After setting suspend to true, should be true",
record.isSuspend());
+
+ // Serialize and deserialize to verify
+ byte[] serialized = record.getValueBytes();
+ PopConsumerRecord deserialized = PopConsumerRecord.decode(serialized);
+ Assert.assertTrue("Deserialized record should preserve suspend=true",
deserialized.isSuspend());
+ }
+
+ @Test
+ public void testSuspendFlagInDeliveryRecordSerializeTest() {
+ // Enhance existing deliveryRecordSerializeTest to include suspend flag
+ PopConsumerRecord consumerRecord = new PopConsumerRecord();
+ consumerRecord.setPopTime(System.currentTimeMillis());
+ consumerRecord.setGroupId("GroupId");
+ consumerRecord.setTopicId("TopicId");
+ consumerRecord.setQueueId(3);
+
consumerRecord.setRetryFlag(PopConsumerRecord.RetryType.RETRY_TOPIC_V1.getCode());
+ consumerRecord.setInvisibleTime(20);
+ consumerRecord.setOffset(100);
+ consumerRecord.setAttemptTimes(2);
+
consumerRecord.setAttemptId(UUID.randomUUID().toString().toUpperCase());
+ consumerRecord.setSuspend(true);
+
+ PopConsumerRecord decodeRecord =
PopConsumerRecord.decode(consumerRecord.getValueBytes());
+ Assert.assertTrue("Decoded record should preserve suspend flag",
decodeRecord.isSuspend());
+ Assert.assertEquals("Suspend flag should match",
consumerRecord.isSuspend(), decodeRecord.isSuspend());
+
+ // Test with suspend = false
+ consumerRecord.setSuspend(false);
+ decodeRecord =
PopConsumerRecord.decode(consumerRecord.getValueBytes());
+ Assert.assertFalse("Decoded record should preserve suspend=false",
decodeRecord.isSuspend());
+ Assert.assertEquals("Suspend flag should match",
consumerRecord.isSuspend(), decodeRecord.isSuspend());
+ }
}
\ No newline at end of file
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
index 69cadb3de2..089d2c1f22 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
@@ -62,6 +62,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.mockito.ArgumentMatchers.any;
@@ -320,7 +321,7 @@ public class PopConsumerServiceTest {
consumerService.ackAsync(
current, 10, groupId, topicId, queueId, 100).join();
consumerService.changeInvisibilityDuration(current, 10,
- current + 100, 10, groupId, topicId, queueId, 100);
+ current + 100, 10, groupId, topicId, queueId, 100, false);
consumerService.shutdown();
}
@@ -468,4 +469,278 @@ public class PopConsumerServiceTest {
consumerService.transferToFsStore();
consumerService.shutdown();
}
+
+ @Test
+ public void testChangeInvisibilityDurationWithSuspendTrue() {
+ long current = System.currentTimeMillis();
+ long popTime = current - 1000;
+ long invisibleTime = 10000;
+ long changedPopTime = current;
+ long changedInvisibleTime = 20000;
+ long offset = 100L;
+
+ consumerService.getPopConsumerStore().start();
+
Mockito.when(brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId)).thenReturn(true);
+
+ // Test with suspend = true
+ consumerService.changeInvisibilityDuration(popTime, invisibleTime,
changedPopTime,
+ changedInvisibleTime, groupId, topicId, queueId, offset, true);
+
+ // Verify that the record was written with suspend = true
+ List<PopConsumerRecord> records = consumerService.getPopConsumerStore()
+ .scanExpiredRecords(0, changedPopTime + changedInvisibleTime +
1000, 10);
+ Assert.assertFalse("Should have at least one record",
records.isEmpty());
+ PopConsumerRecord ckRecord = records.stream()
+ .filter(r -> r.getOffset() == offset && r.getPopTime() ==
changedPopTime)
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull("Should find the checkpoint record", ckRecord);
+ Assert.assertTrue("Suspend flag should be true", ckRecord.isSuspend());
+ Assert.assertEquals("GroupId should match", groupId,
ckRecord.getGroupId());
+ Assert.assertEquals("TopicId should match", topicId,
ckRecord.getTopicId());
+ Assert.assertEquals("QueueId should match", queueId,
ckRecord.getQueueId());
+ Assert.assertEquals("Offset should match", offset,
ckRecord.getOffset());
+
+ consumerService.shutdown();
+ }
+
+ @Test
+ public void testChangeInvisibilityDurationWithSuspendFalse() {
+ long current = System.currentTimeMillis();
+ long popTime = current - 1000;
+ long invisibleTime = 10000;
+ long changedPopTime = current;
+ long changedInvisibleTime = 20000;
+ long offset = 200L;
+
+ consumerService.getPopConsumerStore().start();
+
Mockito.when(brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId)).thenReturn(true);
+
+ // Test with suspend = false
+ consumerService.changeInvisibilityDuration(popTime, invisibleTime,
changedPopTime,
+ changedInvisibleTime, groupId, topicId, queueId, offset, false);
+
+ // Verify that the record was written with suspend = false
+ List<PopConsumerRecord> records = consumerService.getPopConsumerStore()
+ .scanExpiredRecords(0, changedPopTime + changedInvisibleTime +
1000, 10);
+ Assert.assertFalse("Should have at least one record",
records.isEmpty());
+ PopConsumerRecord ckRecord = records.stream()
+ .filter(r -> r.getOffset() == offset && r.getPopTime() ==
changedPopTime)
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull("Should find the checkpoint record", ckRecord);
+ Assert.assertFalse("Suspend flag should be false",
ckRecord.isSuspend());
+ Assert.assertEquals("GroupId should match", groupId,
ckRecord.getGroupId());
+ Assert.assertEquals("TopicId should match", topicId,
ckRecord.getTopicId());
+ Assert.assertEquals("QueueId should match", queueId,
ckRecord.getQueueId());
+ Assert.assertEquals("Offset should match", offset,
ckRecord.getOffset());
+
+ consumerService.shutdown();
+ }
+
+ @Test
+ public void testReviveRetryWithSuspendTrue() {
+
Mockito.when(brokerController.getTopicConfigManager().selectTopicConfig(topicId)).thenReturn(null);
+
Mockito.when(brokerController.getConsumerOffsetManager().queryOffset(groupId,
topicId, 0)).thenReturn(-1L);
+
+ consumerService.createRetryTopicIfNeeded(groupId, topicId);
+ consumerService.clearCache(groupId, topicId, queueId);
+
+ // Create message with reconsumeTimes = 2
+ MessageExt messageExt = new MessageExt();
+ messageExt.setBody("body".getBytes());
+ messageExt.setBornTimestamp(System.currentTimeMillis());
+ messageExt.setFlag(0);
+ messageExt.setSysFlag(0);
+ messageExt.setReconsumeTimes(2);
+ messageExt.putUserProperty("key", "value");
+
+ // Create record with suspend = true
+ PopConsumerRecord record = new PopConsumerRecord();
+ record.setTopicId(topicId);
+ record.setGroupId(groupId);
+ record.setQueueId(queueId);
+ record.setPopTime(System.currentTimeMillis());
+ record.setInvisibleTime(30000);
+ record.setOffset(100L);
+ record.setSuspend(true);
+
+
Mockito.when(brokerController.getBrokerStatsManager()).thenReturn(Mockito.mock(BrokerStatsManager.class));
+ EscapeBridge escapeBridge = Mockito.mock(EscapeBridge.class);
+
Mockito.when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
+
+ // Capture the MessageExtBrokerInner to verify reconsumeTimes
+ ArgumentCaptor<MessageExtBrokerInner> messageCaptor =
+ ArgumentCaptor.forClass(MessageExtBrokerInner.class);
+
Mockito.when(escapeBridge.putMessageToSpecificQueue(messageCaptor.capture()))
+ .thenReturn(new PutMessageResult(
+ PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+
+ PopConsumerService consumerServiceSpy = Mockito.spy(consumerService);
+
Mockito.doNothing().when(consumerServiceSpy).createRetryTopicIfNeeded(any(),
any());
+ Assert.assertTrue("Revive should succeed",
consumerServiceSpy.reviveRetry(record, messageExt));
+
+ // Verify that reconsumeTimes was NOT incremented (should remain 2)
+ MessageExtBrokerInner capturedMessage = messageCaptor.getValue();
+ Assert.assertNotNull("Message should be captured", capturedMessage);
+ Assert.assertEquals("ReconsumeTimes should remain 2 when
suspend=true", 2, capturedMessage.getReconsumeTimes());
+ }
+
+ @Test
+ public void testReviveRetryWithSuspendFalse() {
+
Mockito.when(brokerController.getTopicConfigManager().selectTopicConfig(topicId)).thenReturn(null);
+
Mockito.when(brokerController.getConsumerOffsetManager().queryOffset(groupId,
topicId, 0)).thenReturn(-1L);
+
+ consumerService.createRetryTopicIfNeeded(groupId, topicId);
+ consumerService.clearCache(groupId, topicId, queueId);
+
+ // Create message with reconsumeTimes = 2
+ MessageExt messageExt = new MessageExt();
+ messageExt.setBody("body".getBytes());
+ messageExt.setBornTimestamp(System.currentTimeMillis());
+ messageExt.setFlag(0);
+ messageExt.setSysFlag(0);
+ messageExt.setReconsumeTimes(2);
+ messageExt.putUserProperty("key", "value");
+
+ // Create record with suspend = false
+ PopConsumerRecord record = new PopConsumerRecord();
+ record.setTopicId(topicId);
+ record.setGroupId(groupId);
+ record.setQueueId(queueId);
+ record.setPopTime(System.currentTimeMillis());
+ record.setInvisibleTime(30000);
+ record.setOffset(200L);
+ record.setSuspend(false);
+
+
Mockito.when(brokerController.getBrokerStatsManager()).thenReturn(Mockito.mock(BrokerStatsManager.class));
+ EscapeBridge escapeBridge = Mockito.mock(EscapeBridge.class);
+
Mockito.when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
+
+ // Capture the MessageExtBrokerInner to verify reconsumeTimes
+ ArgumentCaptor<MessageExtBrokerInner> messageCaptor =
+ ArgumentCaptor.forClass(MessageExtBrokerInner.class);
+
Mockito.when(escapeBridge.putMessageToSpecificQueue(messageCaptor.capture()))
+ .thenReturn(new PutMessageResult(
+ PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+
+ PopConsumerService consumerServiceSpy = Mockito.spy(consumerService);
+
Mockito.doNothing().when(consumerServiceSpy).createRetryTopicIfNeeded(any(),
any());
+ Assert.assertTrue("Revive should succeed",
consumerServiceSpy.reviveRetry(record, messageExt));
+
+ // Verify that reconsumeTimes was incremented (should be 3)
+ MessageExtBrokerInner capturedMessage = messageCaptor.getValue();
+ Assert.assertNotNull("Message should be captured", capturedMessage);
+ Assert.assertEquals("ReconsumeTimes should be incremented to 3 when
suspend=false", 3, capturedMessage.getReconsumeTimes());
+ }
+
+ @Test
+ public void testReviveRetryWithSuspendTrueMultipleTimes() {
+
Mockito.when(brokerController.getTopicConfigManager().selectTopicConfig(topicId)).thenReturn(null);
+
Mockito.when(brokerController.getConsumerOffsetManager().queryOffset(groupId,
topicId, 0)).thenReturn(-1L);
+
+ consumerService.createRetryTopicIfNeeded(groupId, topicId);
+ consumerService.clearCache(groupId, topicId, queueId);
+
+ // Create message with reconsumeTimes = 0
+ MessageExt messageExt = new MessageExt();
+ messageExt.setBody("body".getBytes());
+ messageExt.setBornTimestamp(System.currentTimeMillis());
+ messageExt.setFlag(0);
+ messageExt.setSysFlag(0);
+ messageExt.setReconsumeTimes(0);
+ messageExt.putUserProperty("key", "value");
+
+
Mockito.when(brokerController.getBrokerStatsManager()).thenReturn(Mockito.mock(BrokerStatsManager.class));
+ EscapeBridge escapeBridge = Mockito.mock(EscapeBridge.class);
+
Mockito.when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
+
+ PopConsumerService consumerServiceSpy = Mockito.spy(consumerService);
+
Mockito.doNothing().when(consumerServiceSpy).createRetryTopicIfNeeded(any(),
any());
+
+ // Simulate multiple nacks with suspend = true
+ for (int i = 0; i < 3; i++) {
+ PopConsumerRecord record = new PopConsumerRecord();
+ record.setTopicId(topicId);
+ record.setGroupId(groupId);
+ record.setQueueId(queueId);
+ record.setPopTime(System.currentTimeMillis());
+ record.setInvisibleTime(30000);
+ record.setOffset(300L + i);
+ record.setSuspend(true);
+
+ // Capture the MessageExtBrokerInner to verify reconsumeTimes
+ org.mockito.ArgumentCaptor<MessageExtBrokerInner> messageCaptor =
+
org.mockito.ArgumentCaptor.forClass(MessageExtBrokerInner.class);
+
Mockito.when(escapeBridge.putMessageToSpecificQueue(messageCaptor.capture()))
+ .thenReturn(new PutMessageResult(
+ PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+
+ Assert.assertTrue("Revive should succeed",
consumerServiceSpy.reviveRetry(record, messageExt));
+
+ // Verify that reconsumeTimes remains 0 (not incremented)
+ MessageExtBrokerInner capturedMessage = messageCaptor.getValue();
+ Assert.assertNotNull("Message should be captured",
capturedMessage);
+ Assert.assertEquals("ReconsumeTimes should remain 0 after " + (i +
1) + " nacks with suspend=true",
+ 0, capturedMessage.getReconsumeTimes());
+
+ // Update messageExt for next iteration (simulate the message
being re-consumed)
+ messageExt.setReconsumeTimes(capturedMessage.getReconsumeTimes());
+ }
+ }
+
+ @Test
+ public void testReviveRetryWithSuspendFalseMultipleTimes() {
+
Mockito.when(brokerController.getTopicConfigManager().selectTopicConfig(topicId)).thenReturn(null);
+
Mockito.when(brokerController.getConsumerOffsetManager().queryOffset(groupId,
topicId, 0)).thenReturn(-1L);
+
+ consumerService.createRetryTopicIfNeeded(groupId, topicId);
+ consumerService.clearCache(groupId, topicId, queueId);
+
+ // Create message with reconsumeTimes = 0
+ MessageExt messageExt = new MessageExt();
+ messageExt.setBody("body".getBytes());
+ messageExt.setBornTimestamp(System.currentTimeMillis());
+ messageExt.setFlag(0);
+ messageExt.setSysFlag(0);
+ messageExt.setReconsumeTimes(0);
+ messageExt.putUserProperty("key", "value");
+
+
Mockito.when(brokerController.getBrokerStatsManager()).thenReturn(Mockito.mock(BrokerStatsManager.class));
+ EscapeBridge escapeBridge = Mockito.mock(EscapeBridge.class);
+
Mockito.when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
+
+ PopConsumerService consumerServiceSpy = Mockito.spy(consumerService);
+
Mockito.doNothing().when(consumerServiceSpy).createRetryTopicIfNeeded(any(),
any());
+
+ // Simulate multiple nacks with suspend = false
+ for (int i = 0; i < 3; i++) {
+ PopConsumerRecord record = new PopConsumerRecord();
+ record.setTopicId(topicId);
+ record.setGroupId(groupId);
+ record.setQueueId(queueId);
+ record.setPopTime(System.currentTimeMillis());
+ record.setInvisibleTime(30000);
+ record.setOffset(400L + i);
+ record.setSuspend(false);
+
+ // Capture the MessageExtBrokerInner to verify reconsumeTimes
+ org.mockito.ArgumentCaptor<MessageExtBrokerInner> messageCaptor =
+
org.mockito.ArgumentCaptor.forClass(MessageExtBrokerInner.class);
+
Mockito.when(escapeBridge.putMessageToSpecificQueue(messageCaptor.capture()))
+ .thenReturn(new PutMessageResult(
+ PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+
+ Assert.assertTrue("Revive should succeed",
consumerServiceSpy.reviveRetry(record, messageExt));
+
+ // Verify that reconsumeTimes is incremented each time
+ MessageExtBrokerInner capturedMessage = messageCaptor.getValue();
+ Assert.assertNotNull("Message should be captured",
capturedMessage);
+ Assert.assertEquals("ReconsumeTimes should be " + (i + 1) + "
after " + (i + 1) + " nacks with suspend=false",
+ i + 1, capturedMessage.getReconsumeTimes());
+
+ // Update messageExt for next iteration (simulate the message
being re-consumed)
+ messageExt.setReconsumeTimes(capturedMessage.getReconsumeTimes());
+ }
+ }
}
\ No newline at end of file
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
index 7afd338dca..75ce68f4bd 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
@@ -25,7 +25,9 @@ import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import com.alibaba.fastjson2.JSON;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -46,16 +48,19 @@ import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.pop.PopCheckPoint;
import org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -250,4 +255,252 @@ public class ChangeInvisibleTimeProcessorTest {
assertNotNull(response);
assertEquals(ResponseCode.SUCCESS, response.getCode());
}
+
+ @Test
+ public void testProcessRequestAsyncWithSuspendTrue() throws Exception {
+ // Setup mocks
+ Channel mockChannel = mock(Channel.class);
+ RemotingCommand mockRequest = mock(RemotingCommand.class);
+ BrokerController mockBrokerController = mock(BrokerController.class);
+ TopicConfigManager mockTopicConfigManager =
mock(TopicConfigManager.class);
+ MessageStore mockMessageStore = mock(MessageStore.class);
+ BrokerConfig mockBrokerConfig = mock(BrokerConfig.class);
+ BrokerStatsManager mockBrokerStatsManager =
mock(BrokerStatsManager.class);
+ PopMessageProcessor mockPopMessageProcessor =
mock(PopMessageProcessor.class);
+ PopBufferMergeService mockPopBufferMergeService =
mock(PopBufferMergeService.class);
+ BrokerMetricsManager brokerMetricsManager =
mock(BrokerMetricsManager.class);
+ PopMetricsManager popMetricsManager = mock(PopMetricsManager.class);
+ EscapeBridge mockEscapeBridge = mock(EscapeBridge.class);
+
+
when(brokerMetricsManager.getPopMetricsManager()).thenReturn(popMetricsManager);
+
when(mockBrokerController.getBrokerMetricsManager()).thenReturn(brokerMetricsManager);
+ doNothing().when(popMetricsManager).incPopReviveCkPutCount(any(),
any());
+
when(mockBrokerController.getTopicConfigManager()).thenReturn(mockTopicConfigManager);
+
when(mockBrokerController.getMessageStore()).thenReturn(mockMessageStore);
+
when(mockBrokerController.getBrokerConfig()).thenReturn(mockBrokerConfig);
+
when(mockBrokerController.getBrokerStatsManager()).thenReturn(mockBrokerStatsManager);
+
when(mockBrokerController.getPopMessageProcessor()).thenReturn(mockPopMessageProcessor);
+
when(mockPopMessageProcessor.getPopBufferMergeService()).thenReturn(mockPopBufferMergeService);
+ when(mockPopBufferMergeService.addAk(anyInt(),
any())).thenReturn(false);
+
when(mockBrokerController.getEscapeBridge()).thenReturn(mockEscapeBridge);
+
+ PutMessageResult mockPutMessageResult = new
PutMessageResult(PutMessageStatus.PUT_OK, null, true);
+ when(mockEscapeBridge.asyncPutMessageToSpecificQueue(any()))
+
.thenReturn(CompletableFuture.completedFuture(mockPutMessageResult));
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setReadQueueNums(4);
+
when(mockTopicConfigManager.selectTopicConfig(anyString())).thenReturn(topicConfig);
+ when(mockMessageStore.getMinOffsetInQueue(anyString(),
anyInt())).thenReturn(0L);
+ when(mockMessageStore.getMaxOffsetInQueue(anyString(),
anyInt())).thenReturn(10L);
+
when(mockBrokerConfig.isPopConsumerKVServiceEnable()).thenReturn(false);
+
+ ChangeInvisibleTimeRequestHeader requestHeader = new
ChangeInvisibleTimeRequestHeader();
+ requestHeader.setTopic("TestTopic");
+ requestHeader.setQueueId(1);
+ requestHeader.setOffset(5L);
+ requestHeader.setConsumerGroup("TestGroup");
+ requestHeader.setExtraInfo("0 10000 10000 0 TestBroker 1");
+ requestHeader.setInvisibleTime(60000L);
+ requestHeader.setSuspend(true); // Test with suspend=true
+
when(mockRequest.decodeCommandCustomHeader(ChangeInvisibleTimeRequestHeader.class)).thenReturn(requestHeader);
+
+ ChangeInvisibleTimeProcessor processor = new
ChangeInvisibleTimeProcessor(mockBrokerController);
+ CompletableFuture<RemotingCommand> futureResponse =
processor.processRequestAsync(mockChannel, mockRequest, true);
+
+ RemotingCommand response = futureResponse.get();
+ assertNotNull(response);
+ assertEquals(ResponseCode.SUCCESS, response.getCode());
+ }
+
+ @Test
+ public void testProcessRequestAsyncWithSuspendFalse() throws Exception {
+ // Setup mocks
+ Channel mockChannel = mock(Channel.class);
+ RemotingCommand mockRequest = mock(RemotingCommand.class);
+ BrokerController mockBrokerController = mock(BrokerController.class);
+ TopicConfigManager mockTopicConfigManager =
mock(TopicConfigManager.class);
+ MessageStore mockMessageStore = mock(MessageStore.class);
+ BrokerConfig mockBrokerConfig = mock(BrokerConfig.class);
+ BrokerStatsManager mockBrokerStatsManager =
mock(BrokerStatsManager.class);
+ PopMessageProcessor mockPopMessageProcessor =
mock(PopMessageProcessor.class);
+ PopBufferMergeService mockPopBufferMergeService =
mock(PopBufferMergeService.class);
+ BrokerMetricsManager brokerMetricsManager =
mock(BrokerMetricsManager.class);
+ PopMetricsManager popMetricsManager = mock(PopMetricsManager.class);
+ EscapeBridge mockEscapeBridge = mock(EscapeBridge.class);
+
+
when(brokerMetricsManager.getPopMetricsManager()).thenReturn(popMetricsManager);
+
when(mockBrokerController.getBrokerMetricsManager()).thenReturn(brokerMetricsManager);
+ doNothing().when(popMetricsManager).incPopReviveCkPutCount(any(),
any());
+
when(mockBrokerController.getTopicConfigManager()).thenReturn(mockTopicConfigManager);
+
when(mockBrokerController.getMessageStore()).thenReturn(mockMessageStore);
+
when(mockBrokerController.getBrokerConfig()).thenReturn(mockBrokerConfig);
+
when(mockBrokerController.getBrokerStatsManager()).thenReturn(mockBrokerStatsManager);
+
when(mockBrokerController.getPopMessageProcessor()).thenReturn(mockPopMessageProcessor);
+
when(mockPopMessageProcessor.getPopBufferMergeService()).thenReturn(mockPopBufferMergeService);
+ when(mockPopBufferMergeService.addAk(anyInt(),
any())).thenReturn(false);
+
when(mockBrokerController.getEscapeBridge()).thenReturn(mockEscapeBridge);
+
+ PutMessageResult mockPutMessageResult = new
PutMessageResult(PutMessageStatus.PUT_OK, null, true);
+ when(mockEscapeBridge.asyncPutMessageToSpecificQueue(any()))
+
.thenReturn(CompletableFuture.completedFuture(mockPutMessageResult));
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setReadQueueNums(4);
+
when(mockTopicConfigManager.selectTopicConfig(anyString())).thenReturn(topicConfig);
+ when(mockMessageStore.getMinOffsetInQueue(anyString(),
anyInt())).thenReturn(0L);
+ when(mockMessageStore.getMaxOffsetInQueue(anyString(),
anyInt())).thenReturn(10L);
+
when(mockBrokerConfig.isPopConsumerKVServiceEnable()).thenReturn(false);
+
+ ChangeInvisibleTimeRequestHeader requestHeader = new
ChangeInvisibleTimeRequestHeader();
+ requestHeader.setTopic("TestTopic");
+ requestHeader.setQueueId(1);
+ requestHeader.setOffset(5L);
+ requestHeader.setConsumerGroup("TestGroup");
+ requestHeader.setExtraInfo("0 10000 10000 0 TestBroker 1");
+ requestHeader.setInvisibleTime(60000L);
+ requestHeader.setSuspend(false); // Test with suspend=false
+
when(mockRequest.decodeCommandCustomHeader(ChangeInvisibleTimeRequestHeader.class)).thenReturn(requestHeader);
+
+ ChangeInvisibleTimeProcessor processor = new
ChangeInvisibleTimeProcessor(mockBrokerController);
+ CompletableFuture<RemotingCommand> futureResponse =
processor.processRequestAsync(mockChannel, mockRequest, true);
+
+ RemotingCommand response = futureResponse.get();
+ assertNotNull(response);
+ assertEquals(ResponseCode.SUCCESS, response.getCode());
+ }
+
+ @Test
+ public void testProcessRequestWithSuspendTrue() throws
RemotingCommandException, ConsumeQueueException {
+ when(messageStore.getMaxOffsetInQueue(anyString(),
anyInt())).thenReturn(2L);
+
when(escapeBridge.asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK))));
+ int queueId = 0;
+ long queueOffset = 0;
+ long popTime = System.currentTimeMillis() - 1_000;
+ long invisibleTime = 30_000;
+ int reviveQid = 0;
+ String brokerName = "test_broker";
+ String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime,
invisibleTime, reviveQid,
+ topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR +
queueOffset;
+
+ ChangeInvisibleTimeRequestHeader requestHeader = new
ChangeInvisibleTimeRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(queueId);
+ requestHeader.setOffset(queueOffset);
+ requestHeader.setConsumerGroup(group);
+ requestHeader.setExtraInfo(extraInfo);
+ requestHeader.setInvisibleTime(invisibleTime);
+ requestHeader.setSuspend(true); // Set suspend to true
+
+ final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand responseToReturn =
changeInvisibleTimeProcessor.processRequest(handlerContext, request);
+ assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
+ }
+
+ @Test
+ public void testProcessRequestWithSuspendFalse() throws
RemotingCommandException, ConsumeQueueException {
+ when(messageStore.getMaxOffsetInQueue(anyString(),
anyInt())).thenReturn(2L);
+
when(escapeBridge.asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK))));
+ int queueId = 0;
+ long queueOffset = 0;
+ long popTime = System.currentTimeMillis() - 1_000;
+ long invisibleTime = 30_000;
+ int reviveQid = 0;
+ String brokerName = "test_broker";
+ String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime,
invisibleTime, reviveQid,
+ topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR +
queueOffset;
+
+ ChangeInvisibleTimeRequestHeader requestHeader = new
ChangeInvisibleTimeRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(queueId);
+ requestHeader.setOffset(queueOffset);
+ requestHeader.setConsumerGroup(group);
+ requestHeader.setExtraInfo(extraInfo);
+ requestHeader.setInvisibleTime(invisibleTime);
+ requestHeader.setSuspend(false); // Set suspend to false
+
+ final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand responseToReturn =
changeInvisibleTimeProcessor.processRequest(handlerContext, request);
+ assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
+ }
+
+ @Test
+ public void
testAppendCheckPointThenAckOriginWritesSuspendTrueInCheckpoint() throws
Exception {
+ when(messageStore.getMaxOffsetInQueue(anyString(),
anyInt())).thenReturn(2L);
+ ArgumentCaptor<MessageExtBrokerInner> msgCaptor =
ArgumentCaptor.forClass(MessageExtBrokerInner.class);
+ when(escapeBridge.asyncPutMessageToSpecificQueue(msgCaptor.capture()))
+ .thenReturn(CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK))));
+
+ int queueId = 0;
+ long queueOffset = 0;
+ long popTime = System.currentTimeMillis() - 1_000;
+ long invisibleTime = 30_000;
+ int reviveQid = 0;
+ String brokerName = "test_broker";
+ String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime,
invisibleTime, reviveQid,
+ topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR +
queueOffset;
+
+ ChangeInvisibleTimeRequestHeader requestHeader = new
ChangeInvisibleTimeRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(queueId);
+ requestHeader.setOffset(queueOffset);
+ requestHeader.setConsumerGroup(group);
+ requestHeader.setExtraInfo(extraInfo);
+ requestHeader.setInvisibleTime(invisibleTime);
+ requestHeader.setSuspend(true);
+
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
requestHeader);
+ request.makeCustomHeaderToNet();
+ changeInvisibleTimeProcessor.processRequest(handlerContext, request);
+
+ List<MessageExtBrokerInner> allValues = msgCaptor.getAllValues();
+ MessageExtBrokerInner ckMessage = allValues.stream()
+ .filter(m -> PopAckConstants.CK_TAG.equals(m.getTags()))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("No CK message captured"));
+ PopCheckPoint ck = JSON.parseObject(new String(ckMessage.getBody(),
java.nio.charset.StandardCharsets.UTF_8), PopCheckPoint.class);
+ assertThat(ck.isSuspend()).isTrue();
+ }
+
+ @Test
+ public void
testAppendCheckPointThenAckOriginWritesSuspendFalseInCheckpoint() throws
Exception {
+ when(messageStore.getMaxOffsetInQueue(anyString(),
anyInt())).thenReturn(2L);
+ ArgumentCaptor<MessageExtBrokerInner> msgCaptor =
ArgumentCaptor.forClass(MessageExtBrokerInner.class);
+ when(escapeBridge.asyncPutMessageToSpecificQueue(msgCaptor.capture()))
+ .thenReturn(CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK))));
+
+ int queueId = 0;
+ long queueOffset = 0;
+ long popTime = System.currentTimeMillis() - 1_000;
+ long invisibleTime = 30_000;
+ int reviveQid = 0;
+ String brokerName = "test_broker";
+ String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime,
invisibleTime, reviveQid,
+ topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR +
queueOffset;
+
+ ChangeInvisibleTimeRequestHeader requestHeader = new
ChangeInvisibleTimeRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(queueId);
+ requestHeader.setOffset(queueOffset);
+ requestHeader.setConsumerGroup(group);
+ requestHeader.setExtraInfo(extraInfo);
+ requestHeader.setInvisibleTime(invisibleTime);
+ requestHeader.setSuspend(false);
+
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
requestHeader);
+ request.makeCustomHeaderToNet();
+ changeInvisibleTimeProcessor.processRequest(handlerContext, request);
+
+ List<MessageExtBrokerInner> allValues = msgCaptor.getAllValues();
+ MessageExtBrokerInner ckMessage = allValues.stream()
+ .filter(m -> PopAckConstants.CK_TAG.equals(m.getTags()))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("No CK message captured"));
+ PopCheckPoint ck = JSON.parseObject(new String(ckMessage.getBody(),
java.nio.charset.StandardCharsets.UTF_8), PopCheckPoint.class);
+ assertThat(ck.isSuspend()).isFalse();
+ }
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
index 843c0edec1..69bd2a6bc4 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
@@ -136,7 +136,10 @@ public class ReceiveMessageResponseStreamWriter {
messageExt.getMsgId(),
request.getGroup().getName(),
request.getMessageQueue().getTopic().getName(),
- NACK_INVISIBLE_TIME
+ NACK_INVISIBLE_TIME,
+ null,
+ MessagingProcessor.DEFAULT_TIMEOUT_MILLS,
+ true
);
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index cd93aed0f7..dc0f86aae5 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -198,6 +198,18 @@ public class ConsumerProcessor extends AbstractProcessor {
liteTopic,
MessagingProcessor.DEFAULT_TIMEOUT_MILLS);
break;
+ case TO_RETURN:
+ this.messagingProcessor.changeInvisibleTime(
+ ctx,
+ ReceiptHandle.decode(handleString),
+ messageExt.getMsgId(),
+ consumerGroup,
+ topic,
+ MessagingProcessor.INVISIBLE_TIME_MS,
+ null,
+ MessagingProcessor.DEFAULT_TIMEOUT_MILLS,
+ true);
+ break;
case MATCH:
default:
messageExtList.add(messageExt);
@@ -392,8 +404,8 @@ public class ConsumerProcessor extends AbstractProcessor {
});
}
- public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx,
ReceiptHandle handle,
- String messageId, String groupName, String topicName, long
invisibleTime, String liteTopic, long timeoutMillis) {
+ public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx,
ReceiptHandle handle, String messageId,
+ String groupName, String topicName, long invisibleTime, String
liteTopic, long timeoutMillis, boolean suspend) {
CompletableFuture<AckResult> future = new CompletableFuture<>();
try {
this.validateReceiptHandle(handle);
@@ -406,6 +418,7 @@ public class ConsumerProcessor extends AbstractProcessor {
changeInvisibleTimeRequestHeader.setOffset(handle.getOffset());
changeInvisibleTimeRequestHeader.setInvisibleTime(invisibleTime);
changeInvisibleTimeRequestHeader.setLiteTopic(liteTopic);
+ changeInvisibleTimeRequestHeader.setSuspend(suspend);
long commitLogOffset = handle.getCommitLogOffset();
future =
this.serviceManager.getMessageService().changeInvisibleTime(
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index bc044ec7a1..60c9261050 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -234,16 +234,9 @@ public class DefaultMessagingProcessor extends
AbstractStartAndShutdown implemen
@Override
public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx,
ReceiptHandle handle, String messageId,
- String groupName, String topicName, long invisibleTime, long
timeoutMillis) {
+ String groupName, String topicName, long invisibleTime, String
liteTopic, long timeoutMillis, boolean suspend) {
return this.consumerProcessor.changeInvisibleTime(ctx, handle,
messageId, groupName, topicName,
- invisibleTime, null, timeoutMillis);
- }
-
- @Override
- public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx,
ReceiptHandle handle, String messageId,
- String groupName, String topicName, long invisibleTime, String
liteTopic, long timeoutMillis) {
- return this.consumerProcessor.changeInvisibleTime(ctx, handle,
messageId, groupName, topicName,
- invisibleTime, liteTopic, timeoutMillis);
+ invisibleTime, liteTopic, timeoutMillis, suspend);
}
@Override
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index e2c3da6745..a1500dbded 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -52,6 +52,8 @@ public interface MessagingProcessor extends StartAndShutdown {
long DEFAULT_TIMEOUT_MILLS = Duration.ofSeconds(2).toMillis();
+ long INVISIBLE_TIME_MS = Duration.ofSeconds(1).toMillis();
+
SubscriptionGroupConfig getSubscriptionGroupConfig(
ProxyContext ctx,
String consumerGroupName
@@ -243,7 +245,7 @@ public interface MessagingProcessor extends
StartAndShutdown {
return changeInvisibleTime(ctx, handle, messageId, groupName,
topicName, invisibleTime, DEFAULT_TIMEOUT_MILLS);
}
- CompletableFuture<AckResult> changeInvisibleTime(
+ default CompletableFuture<AckResult> changeInvisibleTime(
ProxyContext ctx,
ReceiptHandle handle,
String messageId,
@@ -251,7 +253,9 @@ public interface MessagingProcessor extends
StartAndShutdown {
String topicName,
long invisibleTime,
long timeoutMillis
- );
+ ) {
+ return changeInvisibleTime(ctx, handle, messageId, groupName,
topicName, invisibleTime, null, timeoutMillis, false);
+ }
default CompletableFuture<AckResult> changeInvisibleTime(
ProxyContext ctx,
@@ -262,7 +266,7 @@ public interface MessagingProcessor extends
StartAndShutdown {
long invisibleTime,
String liteTopic
) {
- return changeInvisibleTime(ctx, handle, messageId, groupName,
topicName, invisibleTime, liteTopic, DEFAULT_TIMEOUT_MILLS);
+ return changeInvisibleTime(ctx, handle, messageId, groupName,
topicName, invisibleTime, liteTopic, DEFAULT_TIMEOUT_MILLS, false);
}
CompletableFuture<AckResult> changeInvisibleTime(
@@ -273,7 +277,8 @@ public interface MessagingProcessor extends
StartAndShutdown {
String topicName,
long invisibleTime,
String liteTopic,
- long timeoutMillis
+ long timeoutMillis,
+ boolean suspend
);
CompletableFuture<PullResult> pullMessage(
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/PopMessageResultFilter.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/PopMessageResultFilter.java
index 09c1a0bf1a..60e888ca3d 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/PopMessageResultFilter.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/PopMessageResultFilter.java
@@ -25,7 +25,8 @@ public interface PopMessageResultFilter {
enum FilterResult {
TO_DLQ,
NO_MATCH,
- MATCH
+ MATCH,
+ TO_RETURN
}
FilterResult filterMessage(ProxyContext ctx, String consumerGroup,
SubscriptionData subscriptionData,
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index b002db19b5..f7074dedd6 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -276,7 +276,10 @@ public class ReceiveMessageActivityTest extends
BaseActivityTest {
msgIdCaptor.capture(),
anyString(),
anyString(),
- anyLong())).thenReturn(CompletableFuture.completedFuture(new
AckResult()));
+ anyLong(),
+ any(),
+ anyLong(),
+ anyBoolean())).thenReturn(CompletableFuture.completedFuture(new
AckResult()));
// normal
ProxyContext ctx = createContext();
@@ -308,7 +311,10 @@ public class ReceiveMessageActivityTest extends
BaseActivityTest {
anyString(),
anyString(),
anyString(),
- anyLong());
+ anyLong(),
+ any(),
+ anyLong(),
+ anyBoolean());
assertEquals(Arrays.asList(msgId1, msgId2),
msgIdCaptor.getAllValues());
assertEquals(Arrays.asList(popCk1, popCk2),
receiptHandleCaptor.getAllValues().stream().map(ReceiptHandle::encode).collect(Collectors.toList()));
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriterTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriterTest.java
index a717c78ca1..2bc281376e 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriterTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriterTest.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
+import java.lang.reflect.Method;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
@@ -48,8 +49,10 @@ import org.mockito.ArgumentCaptor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -76,7 +79,7 @@ public class ReceiveMessageResponseStreamWriterTest extends
BaseActivityTest {
public void testWriteMessage() {
ArgumentCaptor<String> changeInvisibleTimeMsgIdCaptor =
ArgumentCaptor.forClass(String.class);
doReturn(CompletableFuture.completedFuture(mock(AckResult.class))).when(this.messagingProcessor)
- .changeInvisibleTime(any(), any(),
changeInvisibleTimeMsgIdCaptor.capture(), anyString(), anyString(), anyLong());
+ .changeInvisibleTime(any(), any(),
changeInvisibleTimeMsgIdCaptor.capture(), anyString(), anyString(), anyLong(),
any(), anyLong(), anyBoolean());
ArgumentCaptor<ReceiveMessageResponse> responseArgumentCaptor =
ArgumentCaptor.forClass(ReceiveMessageResponse.class);
AtomicInteger onNextCallNum = new AtomicInteger(0);
@@ -108,7 +111,7 @@ public class ReceiveMessageResponseStreamWriterTest extends
BaseActivityTest {
verify(streamObserver, times(1)).onCompleted();
verify(streamObserver, times(4)).onNext(any());
verify(this.messagingProcessor, times(1))
- .changeInvisibleTime(any(), any(), anyString(), anyString(),
anyString(), anyLong());
+ .changeInvisibleTime(any(), any(), anyString(), anyString(),
anyString(), anyLong(), any(), anyLong(), eq(true));
assertTrue(responseArgumentCaptor.getAllValues().get(0).hasStatus());
assertEquals(Code.OK,
responseArgumentCaptor.getAllValues().get(0).getStatus().getCode());
@@ -125,7 +128,7 @@ public class ReceiveMessageResponseStreamWriterTest extends
BaseActivityTest {
popResult
);
verify(this.messagingProcessor, times(3))
- .changeInvisibleTime(any(), any(), anyString(), anyString(),
anyString(), anyLong());
+ .changeInvisibleTime(any(), any(), anyString(), anyString(),
anyString(), anyLong(), any(), anyLong(), eq(true));
}
@Test
@@ -152,6 +155,58 @@ public class ReceiveMessageResponseStreamWriterTest
extends BaseActivityTest {
assertEquals(Code.TOO_MANY_REQUESTS, response.getStatus().getCode());
}
+ @Test
+ public void testNackMessageWithSuspendTrue() {
+ ArgumentCaptor<String> changeInvisibleTimeMsgIdCaptor =
ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> changeInvisibleTimeGroupCaptor =
ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> changeInvisibleTimeTopicCaptor =
ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<Long> changeInvisibleTimeInvisibleTimeCaptor =
ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<Boolean> changeInvisibleTimeSuspendCaptor =
ArgumentCaptor.forClass(Boolean.class);
+
+
doReturn(CompletableFuture.completedFuture(mock(AckResult.class))).when(this.messagingProcessor)
+ .changeInvisibleTime(any(), any(),
changeInvisibleTimeMsgIdCaptor.capture(),
+ changeInvisibleTimeGroupCaptor.capture(),
changeInvisibleTimeTopicCaptor.capture(),
+ changeInvisibleTimeInvisibleTimeCaptor.capture(), any(),
anyLong(),
+ changeInvisibleTimeSuspendCaptor.capture());
+
+ MessageExt messageExt = createMessageExt(TOPIC, "tag");
+ ReceiveMessageRequest receiveMessageRequest =
ReceiveMessageRequest.newBuilder()
+ .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
+
.setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
+ .build();
+
+ // Simulate nack by calling processThrowableWhenWriteMessage using
reflection
+ // This is called when an exception occurs during message processing
+ try {
+ Method method =
ReceiveMessageResponseStreamWriter.class.getDeclaredMethod(
+ "processThrowableWhenWriteMessage",
+ Throwable.class, ProxyContext.class,
ReceiveMessageRequest.class, MessageExt.class);
+ method.setAccessible(true);
+ method.invoke(writer,
+ new RuntimeException("Test exception"),
+ ProxyContext.create(),
+ receiveMessageRequest,
+ messageExt);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ // Verify that changeInvisibleTime was called with suspend=true
+ verify(this.messagingProcessor, times(1))
+ .changeInvisibleTime(any(), any(), eq(messageExt.getMsgId()),
+ eq(CONSUMER_GROUP), eq(TOPIC),
eq(ReceiveMessageResponseStreamWriter.NACK_INVISIBLE_TIME),
+ eq(null),
eq(org.apache.rocketmq.proxy.processor.MessagingProcessor.DEFAULT_TIMEOUT_MILLS),
+ eq(true));
+
+ assertEquals(messageExt.getMsgId(),
changeInvisibleTimeMsgIdCaptor.getValue());
+ assertEquals(CONSUMER_GROUP,
changeInvisibleTimeGroupCaptor.getValue());
+ assertEquals(TOPIC, changeInvisibleTimeTopicCaptor.getValue());
+ assertEquals(ReceiveMessageResponseStreamWriter.NACK_INVISIBLE_TIME,
+ changeInvisibleTimeInvisibleTimeCaptor.getValue().longValue());
+ assertTrue("Suspend should be true for nack",
changeInvisibleTimeSuspendCaptor.getValue());
+ }
+
+
private static MessageExt createMessageExt(String topic, String tags) {
String msgId = MessageClientIDSetter.createUniqID();
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
index 9b203ef1f6..4eee5a079c 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
@@ -61,9 +61,11 @@ import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -290,7 +292,7 @@ public class ConsumerProcessorTest extends
BaseProcessorTest {
.thenReturn(CompletableFuture.completedFuture(innerAckResult));
AckResult ackResult =
this.consumerProcessor.changeInvisibleTime(createContext(), handle,
MessageClientIDSetter.createUniqID(),
- CONSUMER_GROUP, TOPIC, 1000, null, 3000).get();
+ CONSUMER_GROUP, TOPIC, 1000, null, 3000, true).get();
assertEquals(AckStatus.OK, ackResult.getStatus());
assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP, new
BrokerConfig().isEnableRetryTopicV2()),
requestHeaderArgumentCaptor.getValue().getTopic());
@@ -357,4 +359,106 @@ public class ConsumerProcessorTest extends
BaseProcessorTest {
.get();
assertThat(result).isEqualTo(Sets.newHashSet(mq1));
}
+
+ @Test
+ public void testPopMessageWithToReturnFilter() throws Throwable {
+ final String tag = "tag";
+ final long invisibleTime = Duration.ofSeconds(15).toMillis();
+ ArgumentCaptor<AddressableMessageQueue> messageQueueArgumentCaptor =
ArgumentCaptor.forClass(AddressableMessageQueue.class);
+ ArgumentCaptor<PopMessageRequestHeader> requestHeaderArgumentCaptor =
ArgumentCaptor.forClass(PopMessageRequestHeader.class);
+
+ List<MessageExt> messageExtList = new ArrayList<>();
+ messageExtList.add(createMessageExt(TOPIC, tag, 0, invisibleTime));
+ PopResult innerPopResult = new PopResult(PopStatus.FOUND,
messageExtList);
+ when(this.messageService.popMessage(any(),
messageQueueArgumentCaptor.capture(), requestHeaderArgumentCaptor.capture(),
anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(innerPopResult));
+
+ when(this.topicRouteService.getCurrentMessageQueueView(any(),
anyString()))
+ .thenReturn(mock(MessageQueueView.class));
+
+ ArgumentCaptor<String> ackMessageIdArgumentCaptor =
ArgumentCaptor.forClass(String.class);
+ when(this.messagingProcessor.ackMessage(any(), any(),
ackMessageIdArgumentCaptor.capture(), anyString(), anyString(), any(),
anyLong()))
+
.thenReturn(CompletableFuture.completedFuture(mock(AckResult.class)));
+
+ ArgumentCaptor<String> changeInvisibleTimeMessageIdArgumentCaptor =
ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<Long> changeInvisibleTimeInvisibleTimeArgumentCaptor =
ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<Boolean> changeInvisibleTimeSuspendArgumentCaptor =
ArgumentCaptor.forClass(Boolean.class);
+ when(this.messagingProcessor.changeInvisibleTime(any(), any(),
changeInvisibleTimeMessageIdArgumentCaptor.capture(),
+ anyString(), anyString(),
changeInvisibleTimeInvisibleTimeArgumentCaptor.capture(), any(), anyLong(),
+ changeInvisibleTimeSuspendArgumentCaptor.capture()))
+
.thenReturn(CompletableFuture.completedFuture(mock(AckResult.class)));
+
+ AddressableMessageQueue messageQueue =
mock(AddressableMessageQueue.class);
+ PopResult popResult = this.consumerProcessor.popMessage(
+ createContext(),
+ (ctx, messageQueueView) -> messageQueue,
+ CONSUMER_GROUP,
+ TOPIC,
+ 60,
+ invisibleTime,
+ Duration.ofSeconds(3).toMillis(),
+ ConsumeInitMode.MAX,
+ FilterAPI.build(TOPIC, tag, ExpressionType.TAG),
+ false,
+ (ctx, consumerGroup, subscriptionData, messageExt) -> {
+ // Return TO_RETURN for the message
+ return PopMessageResultFilter.FilterResult.TO_RETURN;
+ },
+ null,
+ Duration.ofSeconds(3).toMillis()
+ ).get();
+
+ // Verify that changeInvisibleTime was called with suspend=true
+ verify(this.messagingProcessor).changeInvisibleTime(any(), any(),
eq(messageExtList.get(0).getMsgId()),
+ eq(CONSUMER_GROUP), eq(TOPIC),
eq(Duration.ofSeconds(1).toMillis()), eq(null),
+ eq(MessagingProcessor.DEFAULT_TIMEOUT_MILLS), eq(true));
+
+ // Verify that the message was NOT added to the result list
+ assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+ assertEquals(0, popResult.getMsgFoundList().size());
+ }
+
+ @Test
+ public void testChangeInvisibleTimeWithSuspendFalse() throws Throwable {
+ ReceiptHandle handle =
create(createMessageExt(MixAll.RETRY_GROUP_TOPIC_PREFIX + TOPIC, "", 0, 3000));
+ assertNotNull(handle);
+
+ ArgumentCaptor<ChangeInvisibleTimeRequestHeader>
requestHeaderArgumentCaptor =
ArgumentCaptor.forClass(ChangeInvisibleTimeRequestHeader.class);
+ AckResult innerAckResult = new AckResult();
+ innerAckResult.setStatus(AckStatus.OK);
+ when(this.messageService.changeInvisibleTime(any(), any(),
anyString(), requestHeaderArgumentCaptor.capture(), anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(innerAckResult));
+
+ AckResult ackResult =
this.consumerProcessor.changeInvisibleTime(createContext(), handle,
MessageClientIDSetter.createUniqID(),
+ CONSUMER_GROUP, TOPIC, 1000, null, 3000, false).get();
+
+ assertEquals(AckStatus.OK, ackResult.getStatus());
+ assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP, new
BrokerConfig().isEnableRetryTopicV2()),
requestHeaderArgumentCaptor.getValue().getTopic());
+ assertEquals(CONSUMER_GROUP,
requestHeaderArgumentCaptor.getValue().getConsumerGroup());
+ assertEquals(1000,
requestHeaderArgumentCaptor.getValue().getInvisibleTime().longValue());
+ assertEquals(handle.getReceiptHandle(),
requestHeaderArgumentCaptor.getValue().getExtraInfo());
+ assertFalse("Suspend should be false",
requestHeaderArgumentCaptor.getValue().isSuspend());
+ }
+
+ @Test
+ public void testChangeInvisibleTimeWithSuspendTrue() throws Throwable {
+ ReceiptHandle handle =
create(createMessageExt(MixAll.RETRY_GROUP_TOPIC_PREFIX + TOPIC, "", 0, 3000));
+ assertNotNull(handle);
+
+ ArgumentCaptor<ChangeInvisibleTimeRequestHeader>
requestHeaderArgumentCaptor =
ArgumentCaptor.forClass(ChangeInvisibleTimeRequestHeader.class);
+ AckResult innerAckResult = new AckResult();
+ innerAckResult.setStatus(AckStatus.OK);
+ when(this.messageService.changeInvisibleTime(any(), any(),
anyString(), requestHeaderArgumentCaptor.capture(), anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(innerAckResult));
+
+ AckResult ackResult =
this.consumerProcessor.changeInvisibleTime(createContext(), handle,
MessageClientIDSetter.createUniqID(),
+ CONSUMER_GROUP, TOPIC, 1000, null, 3000, true).get();
+
+ assertEquals(AckStatus.OK, ackResult.getStatus());
+ assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP, new
BrokerConfig().isEnableRetryTopicV2()),
requestHeaderArgumentCaptor.getValue().getTopic());
+ assertEquals(CONSUMER_GROUP,
requestHeaderArgumentCaptor.getValue().getConsumerGroup());
+ assertEquals(1000,
requestHeaderArgumentCaptor.getValue().getInvisibleTime().longValue());
+ assertEquals(handle.getReceiptHandle(),
requestHeaderArgumentCaptor.getValue().getExtraInfo());
+ assertTrue("Suspend should be true",
requestHeaderArgumentCaptor.getValue().isSuspend());
+ }
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ChangeInvisibleTimeRequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ChangeInvisibleTimeRequestHeader.java
index 9d44590da3..a80b2cfb6d 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ChangeInvisibleTimeRequestHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ChangeInvisibleTimeRequestHeader.java
@@ -50,6 +50,8 @@ public class ChangeInvisibleTimeRequestHeader extends
TopicQueueRequestHeader {
private String liteTopic;
+ private boolean suspend = false;
+
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -113,6 +115,14 @@ public class ChangeInvisibleTimeRequestHeader extends
TopicQueueRequestHeader {
this.liteTopic = liteTopic;
}
+ public boolean isSuspend() {
+ return suspend;
+ }
+
+ public void setSuspend(boolean suspend) {
+ this.suspend = suspend;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
@@ -123,6 +133,7 @@ public class ChangeInvisibleTimeRequestHeader extends
TopicQueueRequestHeader {
.add("offset", offset)
.add("invisibleTime", invisibleTime)
.add("liteTopic", liteTopic)
+ .add("suspend", suspend)
.omitNullValues()
.toString();
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
index e3587aa28c..803ebc6895 100644
--- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
@@ -45,6 +45,8 @@ public class PopCheckPoint implements
Comparable<PopCheckPoint> {
String brokerName;
@JSONField(name = "rp")
String rePutTimes; // ck rePut times
+ @JSONField(name = "sp")
+ private boolean suspend; // nack without inc reconsume times, false
default.
public long getReviveOffset() {
return reviveOffset;
@@ -148,6 +150,14 @@ public class PopCheckPoint implements
Comparable<PopCheckPoint> {
this.rePutTimes = rePutTimes;
}
+ public boolean isSuspend() {
+ return suspend;
+ }
+
+ public void setSuspend(boolean suspend) {
+ this.suspend = suspend;
+ }
+
public void addDiff(int diff) {
if (this.queueOffsetDiff == null) {
this.queueOffsetDiff = new ArrayList<>(8);
@@ -197,7 +207,7 @@ public class PopCheckPoint implements
Comparable<PopCheckPoint> {
@Override
public String toString() {
return "PopCheckPoint [topic=" + topic + ", cid=" + cid + ", queueId="
+ queueId + ", startOffset=" + startOffset + ", bitMap=" + bitMap + ", num=" +
num + ", reviveTime=" + getReviveTime()
- + ", reviveOffset=" + reviveOffset + ", diff=" + queueOffsetDiff +
", brokerName=" + brokerName + ", rePutTimes=" + rePutTimes + "]";
+ + ", reviveOffset=" + reviveOffset + ", diff=" + queueOffsetDiff +
", brokerName=" + brokerName + ", rePutTimes=" + rePutTimes + ", suspend=" +
suspend + "]";
}
@Override