This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f80753f0da [ISSUE #10050] Support ChangeInvisibleTime without 
incrementing message reconsume times (#10051)
f80753f0da is described below

commit f80753f0dad40c383f6de69fd157fb1c79c934c6
Author: ltamber <[email protected]>
AuthorDate: Mon Feb 2 14:06:57 2026 +0800

    [ISSUE #10050] Support ChangeInvisibleTime without incrementing message 
reconsume times (#10051)
---
 .../rocketmq/broker/pop/PopConsumerRecord.java     |  18 ++
 .../rocketmq/broker/pop/PopConsumerService.java    |  16 +-
 .../processor/ChangeInvisibleTimeProcessor.java    |   3 +-
 .../broker/processor/PopReviveService.java         |   6 +-
 .../rocketmq/broker/pop/PopConsumerRecordTest.java | 185 ++++++++++++++
 .../broker/pop/PopConsumerServiceTest.java         | 277 ++++++++++++++++++++-
 .../ChangeInvisibleTimeProcessorTest.java          | 253 +++++++++++++++++++
 .../ReceiveMessageResponseStreamWriter.java        |   5 +-
 .../proxy/processor/ConsumerProcessor.java         |  17 +-
 .../proxy/processor/DefaultMessagingProcessor.java |  11 +-
 .../proxy/processor/MessagingProcessor.java        |  13 +-
 .../proxy/processor/PopMessageResultFilter.java    |   3 +-
 .../v2/consumer/ReceiveMessageActivityTest.java    |  10 +-
 .../ReceiveMessageResponseStreamWriterTest.java    |  61 ++++-
 .../proxy/processor/ConsumerProcessorTest.java     | 106 +++++++-
 .../header/ChangeInvisibleTimeRequestHeader.java   |  11 +
 .../apache/rocketmq/store/pop/PopCheckPoint.java   |  12 +-
 17 files changed, 975 insertions(+), 32 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
index 661ace9bcb..d10b584ef6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
@@ -70,12 +70,20 @@ public class PopConsumerRecord {
     @JSONField(ordinal = 8)
     private String attemptId;
 
+    @JSONField(ordinal = 9)
+    private boolean suspend;
+
     // used for test and fastjson
     public PopConsumerRecord() {
     }
 
     public PopConsumerRecord(long popTime, String groupId, String topicId, int 
queueId,
         int retryFlag, long invisibleTime, long offset, String attemptId) {
+        this(popTime, groupId, topicId, queueId, retryFlag, invisibleTime, 
offset, attemptId, false);
+    }
+
+    public PopConsumerRecord(long popTime, String groupId, String topicId, int 
queueId, int retryFlag,
+                             long invisibleTime, long offset, String 
attemptId, boolean suspend) {
 
         this.popTime = popTime;
         this.groupId = groupId;
@@ -85,6 +93,7 @@ public class PopConsumerRecord {
         this.invisibleTime = invisibleTime;
         this.offset = offset;
         this.attemptId = attemptId;
+        this.suspend = suspend;
     }
 
     @JSONField(serialize = false)
@@ -194,6 +203,14 @@ public class PopConsumerRecord {
         this.attemptId = attemptId;
     }
 
+    public boolean isSuspend() {
+        return suspend;
+    }
+
+    public void setSuspend(boolean suspend) {
+        this.suspend = suspend;
+    }
+
     @Override
     public String toString() {
         return "PopDeliveryRecord{" +
@@ -206,6 +223,7 @@ public class PopConsumerRecord {
             ", offset=" + offset +
             ", attemptTimes=" + attemptTimes +
             ", attemptId='" + attemptId + '\'' +
+            ", suspend=" + suspend +
             '}';
     }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index a1356c2847..d76651643d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -486,8 +486,9 @@ public class PopConsumerService extends ServiceThread {
     }
 
     // refer ChangeInvisibleTimeProcessor.appendCheckPointThenAckOrigin
-    public void changeInvisibilityDuration(long popTime, long invisibleTime,
-        long changedPopTime, long changedInvisibleTime, String groupId, String 
topicId, int queueId, long offset) {
+    public void changeInvisibilityDuration(long popTime, long invisibleTime, 
long changedPopTime,
+                                           long changedInvisibleTime, String 
groupId, String topicId,
+                                           int queueId, long offset, boolean 
suspend) {
 
         if (brokerConfig.isPopConsumerKVServiceLog()) {
             log.info("PopConsumerService change, time={}, invisible={}, " +
@@ -496,10 +497,10 @@ public class PopConsumerService extends ServiceThread {
         }
 
         PopConsumerRecord ckRecord = new PopConsumerRecord(
-            changedPopTime, groupId, topicId, queueId, 0, 
changedInvisibleTime, offset, null);
+            changedPopTime, groupId, topicId, queueId, 0, 
changedInvisibleTime, offset, null, suspend);
 
         PopConsumerRecord ackRecord = new PopConsumerRecord(
-            popTime, groupId, topicId, queueId, 0, invisibleTime, offset, 
null);
+            popTime, groupId, topicId, queueId, 0, invisibleTime, offset, 
null, suspend);
 
         // No need to generate new records when the group does not exist,
         // because these retry messages will not be consumed by anyone.
@@ -689,7 +690,12 @@ public class PopConsumerService extends ServiceThread {
         msgInner.setSysFlag(messageExt.getSysFlag());
         msgInner.setBornHost(brokerController.getStoreHost());
         msgInner.setStoreHost(brokerController.getStoreHost());
-        msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
+        if (record.isSuspend()) {
+            msgInner.setReconsumeTimes(messageExt.getReconsumeTimes());
+        } else {
+            msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
+        }
+
         msgInner.getProperties().putAll(messageExt.getProperties());
 
         // set first pop time here
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index 133e13ccb2..a8b01ceed2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -153,7 +153,7 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
                 
brokerController.getPopConsumerService().changeInvisibilityDuration(
                     ExtraInfoUtil.getPopTime(extraInfo), 
ExtraInfoUtil.getInvisibleTime(extraInfo), current,
                     requestHeader.getInvisibleTime(), 
requestHeader.getConsumerGroup(), requestHeader.getTopic(),
-                    requestHeader.getQueueId(), requestHeader.getOffset());
+                    requestHeader.getQueueId(), requestHeader.getOffset(), 
requestHeader.isSuspend());
                 
responseHeader.setInvisibleTime(requestHeader.getInvisibleTime());
                 responseHeader.setPopTime(current);
                 
responseHeader.setReviveQid(ExtraInfoUtil.getReviveQid(extraInfo));
@@ -324,6 +324,7 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
         ck.setQueueId(queueId);
         ck.addDiff(0);
         ck.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));
+        ck.setSuspend(requestHeader.isSuspend());
 
         
msgInner.setBody(JSON.toJSONString(ck).getBytes(StandardCharsets.UTF_8));
         msgInner.setQueueId(reviveQid);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index e88879d9c6..07f16e9896 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -122,7 +122,11 @@ public class PopReviveService extends ServiceThread {
         msgInner.setSysFlag(messageExt.getSysFlag());
         msgInner.setBornHost(brokerController.getStoreHost());
         msgInner.setStoreHost(brokerController.getStoreHost());
-        msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
+        if (popCheckPoint.isSuspend()) {
+            msgInner.setReconsumeTimes(messageExt.getReconsumeTimes());
+        } else {
+            msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
+        }
         msgInner.getProperties().putAll(messageExt.getProperties());
         if (messageExt.getReconsumeTimes() == 0 || 
msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
             msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, 
String.valueOf(popCheckPoint.getPopTime()));
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRecordTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRecordTest.java
index 24a79b33f3..b1a1a700c5 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRecordTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRecordTest.java
@@ -72,4 +72,189 @@ public class PopConsumerRecordTest {
         Assert.assertEquals(0, consumerRecord2.getAttemptTimes());
         Assert.assertEquals(decodeRecord.getAttemptId(), 
consumerRecord2.getAttemptId());
     }
+
+    @Test
+    public void testSuspendFlagInitialization() {
+        // Test constructor without suspend flag (should default to false)
+        PopConsumerRecord record1 = new PopConsumerRecord(
+            System.currentTimeMillis(), "test-group", "test-topic", 0, 0, 
30000L, 100L, "attempt-id");
+        Assert.assertFalse("Suspend flag should default to false", 
record1.isSuspend());
+
+        // Test constructor with suspend flag set to true
+        PopConsumerRecord record2 = new PopConsumerRecord(
+            System.currentTimeMillis(), "test-group", "test-topic", 0, 0, 
30000L, 100L, "attempt-id", true);
+        Assert.assertTrue("Suspend flag should be true", record2.isSuspend());
+
+        // Test constructor with suspend flag set to false
+        PopConsumerRecord record3 = new PopConsumerRecord(
+            System.currentTimeMillis(), "test-group", "test-topic", 0, 0, 
30000L, 100L, "attempt-id", false);
+        Assert.assertFalse("Suspend flag should be false", 
record3.isSuspend());
+    }
+
+    @Test
+    public void testSuspendFlagSerialization() {
+        // Test serialization/deserialization with suspend flag
+        PopConsumerRecord originalRecord = new PopConsumerRecord(
+            1234567890L, "test-group", "test-topic", 0, 0, 30000L, 100L, 
"attempt-id", true);
+
+        byte[] serialized = originalRecord.getValueBytes();
+        PopConsumerRecord deserialized = PopConsumerRecord.decode(serialized);
+
+        Assert.assertTrue("Deserialized record should have suspend flag true", 
deserialized.isSuspend());
+        Assert.assertEquals("Other fields should match", 
originalRecord.getGroupId(), deserialized.getGroupId());
+        Assert.assertEquals("Other fields should match", 
originalRecord.getTopicId(), deserialized.getTopicId());
+        Assert.assertEquals("Other fields should match", 
originalRecord.getOffset(), deserialized.getOffset());
+    }
+
+    @Test
+    public void testSuspendFlagGetterSetter() {
+        PopConsumerRecord record = new PopConsumerRecord();
+
+        // Test initial value
+        Assert.assertFalse("Initial suspend value should be false", 
record.isSuspend());
+
+        // Test setter
+        record.setSuspend(true);
+        Assert.assertTrue("After setting to true, should be true", 
record.isSuspend());
+
+        record.setSuspend(false);
+        Assert.assertFalse("After setting to false, should be false", 
record.isSuspend());
+    }
+
+    @Test
+    public void testSuspendInToString() {
+        PopConsumerRecord record = new PopConsumerRecord(
+            1234567890L, "test-group", "test-topic", 0, 0, 30000L, 100L, 
"attempt-id", true);
+
+        String toString = record.toString();
+        Assert.assertTrue("toString should include suspend information", 
toString.contains("suspend=true"));
+
+        PopConsumerRecord record2 = new PopConsumerRecord(
+            1234567890L, "test-group", "test-topic", 0, 0, 30000L, 100L, 
"attempt-id", false);
+
+        String toString2 = record2.toString();
+        Assert.assertTrue("toString should include suspend information", 
toString2.contains("suspend=false"));
+    }
+
+    @Test
+    public void testSuspendFlagSerializationWithFalse() {
+        // Test serialization/deserialization with suspend flag set to false
+        PopConsumerRecord originalRecord = new PopConsumerRecord(
+            1234567890L, "test-group", "test-topic", 0, 0, 30000L, 100L, 
"attempt-id", false);
+
+        byte[] serialized = originalRecord.getValueBytes();
+        PopConsumerRecord deserialized = PopConsumerRecord.decode(serialized);
+
+        Assert.assertFalse("Deserialized record should have suspend flag 
false", deserialized.isSuspend());
+        Assert.assertEquals("GroupId should match", 
originalRecord.getGroupId(), deserialized.getGroupId());
+        Assert.assertEquals("TopicId should match", 
originalRecord.getTopicId(), deserialized.getTopicId());
+        Assert.assertEquals("Offset should match", originalRecord.getOffset(), 
deserialized.getOffset());
+        Assert.assertEquals("PopTime should match", 
originalRecord.getPopTime(), deserialized.getPopTime());
+        Assert.assertEquals("QueueId should match", 
originalRecord.getQueueId(), deserialized.getQueueId());
+        Assert.assertEquals("InvisibleTime should match", 
originalRecord.getInvisibleTime(), deserialized.getInvisibleTime());
+        Assert.assertEquals("RetryFlag should match", 
originalRecord.getRetryFlag(), deserialized.getRetryFlag());
+        Assert.assertEquals("AttemptId should match", 
originalRecord.getAttemptId(), deserialized.getAttemptId());
+    }
+
+    @Test
+    public void testSuspendFlagJSONSerializationCompleteness() {
+        // Test complete serialization/deserialization with all fields 
including suspend
+        long popTime = System.currentTimeMillis();
+        String groupId = "test-group";
+        String topicId = "test-topic";
+        int queueId = 1;
+        int retryFlag = PopConsumerRecord.RetryType.RETRY_TOPIC_V2.getCode();
+        long invisibleTime = 30000L;
+        long offset = 100L;
+        String attemptId = UUID.randomUUID().toString().toUpperCase();
+
+        // Test with suspend = true
+        PopConsumerRecord recordWithSuspend = new PopConsumerRecord(
+            popTime, groupId, topicId, queueId, retryFlag, invisibleTime, 
offset, attemptId, true);
+        recordWithSuspend.setAttemptTimes(3);
+
+        byte[] serialized = recordWithSuspend.getValueBytes();
+        PopConsumerRecord deserialized = PopConsumerRecord.decode(serialized);
+
+        Assert.assertTrue("Suspend flag should be true", 
deserialized.isSuspend());
+        Assert.assertEquals("PopTime should match", popTime, 
deserialized.getPopTime());
+        Assert.assertEquals("GroupId should match", groupId, 
deserialized.getGroupId());
+        Assert.assertEquals("TopicId should match", topicId, 
deserialized.getTopicId());
+        Assert.assertEquals("QueueId should match", queueId, 
deserialized.getQueueId());
+        Assert.assertEquals("RetryFlag should match", retryFlag, 
deserialized.getRetryFlag());
+        Assert.assertEquals("InvisibleTime should match", invisibleTime, 
deserialized.getInvisibleTime());
+        Assert.assertEquals("Offset should match", offset, 
deserialized.getOffset());
+        Assert.assertEquals("AttemptTimes should match", 3, 
deserialized.getAttemptTimes());
+        Assert.assertEquals("AttemptId should match", attemptId, 
deserialized.getAttemptId());
+
+        // Test with suspend = false
+        PopConsumerRecord recordWithoutSuspend = new PopConsumerRecord(
+            popTime, groupId, topicId, queueId, retryFlag, invisibleTime, 
offset, attemptId, false);
+        recordWithoutSuspend.setAttemptTimes(3);
+
+        serialized = recordWithoutSuspend.getValueBytes();
+        deserialized = PopConsumerRecord.decode(serialized);
+
+        Assert.assertFalse("Suspend flag should be false", 
deserialized.isSuspend());
+        Assert.assertEquals("PopTime should match", popTime, 
deserialized.getPopTime());
+        Assert.assertEquals("GroupId should match", groupId, 
deserialized.getGroupId());
+        Assert.assertEquals("TopicId should match", topicId, 
deserialized.getTopicId());
+        Assert.assertEquals("QueueId should match", queueId, 
deserialized.getQueueId());
+        Assert.assertEquals("RetryFlag should match", retryFlag, 
deserialized.getRetryFlag());
+        Assert.assertEquals("InvisibleTime should match", invisibleTime, 
deserialized.getInvisibleTime());
+        Assert.assertEquals("Offset should match", offset, 
deserialized.getOffset());
+        Assert.assertEquals("AttemptTimes should match", 3, 
deserialized.getAttemptTimes());
+        Assert.assertEquals("AttemptId should match", attemptId, 
deserialized.getAttemptId());
+    }
+
+    @Test
+    public void testSuspendFlagDefaultValueInNoArgConstructor() {
+        // Test that no-arg constructor defaults suspend to false
+        PopConsumerRecord record = new PopConsumerRecord();
+        Assert.assertFalse("No-arg constructor should default suspend to 
false", record.isSuspend());
+
+        // Set all fields manually
+        record.setPopTime(System.currentTimeMillis());
+        record.setGroupId("test-group");
+        record.setTopicId("test-topic");
+        record.setQueueId(0);
+        record.setRetryFlag(0);
+        record.setInvisibleTime(30000L);
+        record.setOffset(100L);
+        record.setAttemptId("attempt-id");
+        record.setSuspend(true);
+
+        Assert.assertTrue("After setting suspend to true, should be true", 
record.isSuspend());
+
+        // Serialize and deserialize to verify
+        byte[] serialized = record.getValueBytes();
+        PopConsumerRecord deserialized = PopConsumerRecord.decode(serialized);
+        Assert.assertTrue("Deserialized record should preserve suspend=true", 
deserialized.isSuspend());
+    }
+
+    @Test
+    public void testSuspendFlagInDeliveryRecordSerializeTest() {
+        // Enhance existing deliveryRecordSerializeTest to include suspend flag
+        PopConsumerRecord consumerRecord = new PopConsumerRecord();
+        consumerRecord.setPopTime(System.currentTimeMillis());
+        consumerRecord.setGroupId("GroupId");
+        consumerRecord.setTopicId("TopicId");
+        consumerRecord.setQueueId(3);
+        
consumerRecord.setRetryFlag(PopConsumerRecord.RetryType.RETRY_TOPIC_V1.getCode());
+        consumerRecord.setInvisibleTime(20);
+        consumerRecord.setOffset(100);
+        consumerRecord.setAttemptTimes(2);
+        
consumerRecord.setAttemptId(UUID.randomUUID().toString().toUpperCase());
+        consumerRecord.setSuspend(true);
+
+        PopConsumerRecord decodeRecord = 
PopConsumerRecord.decode(consumerRecord.getValueBytes());
+        Assert.assertTrue("Decoded record should preserve suspend flag", 
decodeRecord.isSuspend());
+        Assert.assertEquals("Suspend flag should match", 
consumerRecord.isSuspend(), decodeRecord.isSuspend());
+
+        // Test with suspend = false
+        consumerRecord.setSuspend(false);
+        decodeRecord = 
PopConsumerRecord.decode(consumerRecord.getValueBytes());
+        Assert.assertFalse("Decoded record should preserve suspend=false", 
decodeRecord.isSuspend());
+        Assert.assertEquals("Suspend flag should match", 
consumerRecord.isSuspend(), decodeRecord.isSuspend());
+    }
 }
\ No newline at end of file
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
index 69cadb3de2..089d2c1f22 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
@@ -62,6 +62,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -320,7 +321,7 @@ public class PopConsumerServiceTest {
         consumerService.ackAsync(
             current, 10, groupId, topicId, queueId, 100).join();
         consumerService.changeInvisibilityDuration(current, 10,
-            current + 100, 10, groupId, topicId, queueId, 100);
+            current + 100, 10, groupId, topicId, queueId, 100, false);
         consumerService.shutdown();
     }
 
@@ -468,4 +469,278 @@ public class PopConsumerServiceTest {
         consumerService.transferToFsStore();
         consumerService.shutdown();
     }
+
+    @Test
+    public void testChangeInvisibilityDurationWithSuspendTrue() {
+        long current = System.currentTimeMillis();
+        long popTime = current - 1000;
+        long invisibleTime = 10000;
+        long changedPopTime = current;
+        long changedInvisibleTime = 20000;
+        long offset = 100L;
+
+        consumerService.getPopConsumerStore().start();
+        
Mockito.when(brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId)).thenReturn(true);
+
+        // Test with suspend = true
+        consumerService.changeInvisibilityDuration(popTime, invisibleTime, 
changedPopTime,
+            changedInvisibleTime, groupId, topicId, queueId, offset, true);
+
+        // Verify that the record was written with suspend = true
+        List<PopConsumerRecord> records = consumerService.getPopConsumerStore()
+            .scanExpiredRecords(0, changedPopTime + changedInvisibleTime + 
1000, 10);
+        Assert.assertFalse("Should have at least one record", 
records.isEmpty());
+        PopConsumerRecord ckRecord = records.stream()
+            .filter(r -> r.getOffset() == offset && r.getPopTime() == 
changedPopTime)
+            .findFirst()
+            .orElse(null);
+        Assert.assertNotNull("Should find the checkpoint record", ckRecord);
+        Assert.assertTrue("Suspend flag should be true", ckRecord.isSuspend());
+        Assert.assertEquals("GroupId should match", groupId, 
ckRecord.getGroupId());
+        Assert.assertEquals("TopicId should match", topicId, 
ckRecord.getTopicId());
+        Assert.assertEquals("QueueId should match", queueId, 
ckRecord.getQueueId());
+        Assert.assertEquals("Offset should match", offset, 
ckRecord.getOffset());
+
+        consumerService.shutdown();
+    }
+
+    @Test
+    public void testChangeInvisibilityDurationWithSuspendFalse() {
+        long current = System.currentTimeMillis();
+        long popTime = current - 1000;
+        long invisibleTime = 10000;
+        long changedPopTime = current;
+        long changedInvisibleTime = 20000;
+        long offset = 200L;
+
+        consumerService.getPopConsumerStore().start();
+        
Mockito.when(brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId)).thenReturn(true);
+
+        // Test with suspend = false
+        consumerService.changeInvisibilityDuration(popTime, invisibleTime, 
changedPopTime,
+            changedInvisibleTime, groupId, topicId, queueId, offset, false);
+
+        // Verify that the record was written with suspend = false
+        List<PopConsumerRecord> records = consumerService.getPopConsumerStore()
+            .scanExpiredRecords(0, changedPopTime + changedInvisibleTime + 
1000, 10);
+        Assert.assertFalse("Should have at least one record", 
records.isEmpty());
+        PopConsumerRecord ckRecord = records.stream()
+            .filter(r -> r.getOffset() == offset && r.getPopTime() == 
changedPopTime)
+            .findFirst()
+            .orElse(null);
+        Assert.assertNotNull("Should find the checkpoint record", ckRecord);
+        Assert.assertFalse("Suspend flag should be false", 
ckRecord.isSuspend());
+        Assert.assertEquals("GroupId should match", groupId, 
ckRecord.getGroupId());
+        Assert.assertEquals("TopicId should match", topicId, 
ckRecord.getTopicId());
+        Assert.assertEquals("QueueId should match", queueId, 
ckRecord.getQueueId());
+        Assert.assertEquals("Offset should match", offset, 
ckRecord.getOffset());
+
+        consumerService.shutdown();
+    }
+
+    @Test
+    public void testReviveRetryWithSuspendTrue() {
+        
Mockito.when(brokerController.getTopicConfigManager().selectTopicConfig(topicId)).thenReturn(null);
+        
Mockito.when(brokerController.getConsumerOffsetManager().queryOffset(groupId, 
topicId, 0)).thenReturn(-1L);
+
+        consumerService.createRetryTopicIfNeeded(groupId, topicId);
+        consumerService.clearCache(groupId, topicId, queueId);
+
+        // Create message with reconsumeTimes = 2
+        MessageExt messageExt = new MessageExt();
+        messageExt.setBody("body".getBytes());
+        messageExt.setBornTimestamp(System.currentTimeMillis());
+        messageExt.setFlag(0);
+        messageExt.setSysFlag(0);
+        messageExt.setReconsumeTimes(2);
+        messageExt.putUserProperty("key", "value");
+
+        // Create record with suspend = true
+        PopConsumerRecord record = new PopConsumerRecord();
+        record.setTopicId(topicId);
+        record.setGroupId(groupId);
+        record.setQueueId(queueId);
+        record.setPopTime(System.currentTimeMillis());
+        record.setInvisibleTime(30000);
+        record.setOffset(100L);
+        record.setSuspend(true);
+
+        
Mockito.when(brokerController.getBrokerStatsManager()).thenReturn(Mockito.mock(BrokerStatsManager.class));
+        EscapeBridge escapeBridge = Mockito.mock(EscapeBridge.class);
+        
Mockito.when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
+
+        // Capture the MessageExtBrokerInner to verify reconsumeTimes
+        ArgumentCaptor<MessageExtBrokerInner> messageCaptor =
+            ArgumentCaptor.forClass(MessageExtBrokerInner.class);
+        
Mockito.when(escapeBridge.putMessageToSpecificQueue(messageCaptor.capture()))
+            .thenReturn(new PutMessageResult(
+                PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+
+        PopConsumerService consumerServiceSpy = Mockito.spy(consumerService);
+        
Mockito.doNothing().when(consumerServiceSpy).createRetryTopicIfNeeded(any(), 
any());
+        Assert.assertTrue("Revive should succeed", 
consumerServiceSpy.reviveRetry(record, messageExt));
+
+        // Verify that reconsumeTimes was NOT incremented (should remain 2)
+        MessageExtBrokerInner capturedMessage = messageCaptor.getValue();
+        Assert.assertNotNull("Message should be captured", capturedMessage);
+        Assert.assertEquals("ReconsumeTimes should remain 2 when 
suspend=true", 2, capturedMessage.getReconsumeTimes());
+    }
+
+    @Test
+    public void testReviveRetryWithSuspendFalse() {
+        
Mockito.when(brokerController.getTopicConfigManager().selectTopicConfig(topicId)).thenReturn(null);
+        
Mockito.when(brokerController.getConsumerOffsetManager().queryOffset(groupId, 
topicId, 0)).thenReturn(-1L);
+
+        consumerService.createRetryTopicIfNeeded(groupId, topicId);
+        consumerService.clearCache(groupId, topicId, queueId);
+
+        // Create message with reconsumeTimes = 2
+        MessageExt messageExt = new MessageExt();
+        messageExt.setBody("body".getBytes());
+        messageExt.setBornTimestamp(System.currentTimeMillis());
+        messageExt.setFlag(0);
+        messageExt.setSysFlag(0);
+        messageExt.setReconsumeTimes(2);
+        messageExt.putUserProperty("key", "value");
+
+        // Create record with suspend = false
+        PopConsumerRecord record = new PopConsumerRecord();
+        record.setTopicId(topicId);
+        record.setGroupId(groupId);
+        record.setQueueId(queueId);
+        record.setPopTime(System.currentTimeMillis());
+        record.setInvisibleTime(30000);
+        record.setOffset(200L);
+        record.setSuspend(false);
+
+        
Mockito.when(brokerController.getBrokerStatsManager()).thenReturn(Mockito.mock(BrokerStatsManager.class));
+        EscapeBridge escapeBridge = Mockito.mock(EscapeBridge.class);
+        
Mockito.when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
+
+        // Capture the MessageExtBrokerInner to verify reconsumeTimes
+        ArgumentCaptor<MessageExtBrokerInner> messageCaptor =
+            ArgumentCaptor.forClass(MessageExtBrokerInner.class);
+        
Mockito.when(escapeBridge.putMessageToSpecificQueue(messageCaptor.capture()))
+            .thenReturn(new PutMessageResult(
+                PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+
+        PopConsumerService consumerServiceSpy = Mockito.spy(consumerService);
+        
Mockito.doNothing().when(consumerServiceSpy).createRetryTopicIfNeeded(any(), 
any());
+        Assert.assertTrue("Revive should succeed", 
consumerServiceSpy.reviveRetry(record, messageExt));
+
+        // Verify that reconsumeTimes was incremented (should be 3)
+        MessageExtBrokerInner capturedMessage = messageCaptor.getValue();
+        Assert.assertNotNull("Message should be captured", capturedMessage);
+        Assert.assertEquals("ReconsumeTimes should be incremented to 3 when 
suspend=false", 3, capturedMessage.getReconsumeTimes());
+    }
+
+    @Test
+    public void testReviveRetryWithSuspendTrueMultipleTimes() {
+        
Mockito.when(brokerController.getTopicConfigManager().selectTopicConfig(topicId)).thenReturn(null);
+        
Mockito.when(brokerController.getConsumerOffsetManager().queryOffset(groupId, 
topicId, 0)).thenReturn(-1L);
+
+        consumerService.createRetryTopicIfNeeded(groupId, topicId);
+        consumerService.clearCache(groupId, topicId, queueId);
+
+        // Create message with reconsumeTimes = 0
+        MessageExt messageExt = new MessageExt();
+        messageExt.setBody("body".getBytes());
+        messageExt.setBornTimestamp(System.currentTimeMillis());
+        messageExt.setFlag(0);
+        messageExt.setSysFlag(0);
+        messageExt.setReconsumeTimes(0);
+        messageExt.putUserProperty("key", "value");
+
+        
Mockito.when(brokerController.getBrokerStatsManager()).thenReturn(Mockito.mock(BrokerStatsManager.class));
+        EscapeBridge escapeBridge = Mockito.mock(EscapeBridge.class);
+        
Mockito.when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
+
+        PopConsumerService consumerServiceSpy = Mockito.spy(consumerService);
+        
Mockito.doNothing().when(consumerServiceSpy).createRetryTopicIfNeeded(any(), 
any());
+
+        // Simulate multiple nacks with suspend = true
+        for (int i = 0; i < 3; i++) {
+            PopConsumerRecord record = new PopConsumerRecord();
+            record.setTopicId(topicId);
+            record.setGroupId(groupId);
+            record.setQueueId(queueId);
+            record.setPopTime(System.currentTimeMillis());
+            record.setInvisibleTime(30000);
+            record.setOffset(300L + i);
+            record.setSuspend(true);
+
+            // Capture the MessageExtBrokerInner to verify reconsumeTimes
+            org.mockito.ArgumentCaptor<MessageExtBrokerInner> messageCaptor =
+                
org.mockito.ArgumentCaptor.forClass(MessageExtBrokerInner.class);
+            
Mockito.when(escapeBridge.putMessageToSpecificQueue(messageCaptor.capture()))
+                .thenReturn(new PutMessageResult(
+                    PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+
+            Assert.assertTrue("Revive should succeed", 
consumerServiceSpy.reviveRetry(record, messageExt));
+
+            // Verify that reconsumeTimes remains 0 (not incremented)
+            MessageExtBrokerInner capturedMessage = messageCaptor.getValue();
+            Assert.assertNotNull("Message should be captured", 
capturedMessage);
+            Assert.assertEquals("ReconsumeTimes should remain 0 after " + (i + 
1) + " nacks with suspend=true",
+                0, capturedMessage.getReconsumeTimes());
+
+            // Update messageExt for next iteration (simulate the message 
being re-consumed)
+            messageExt.setReconsumeTimes(capturedMessage.getReconsumeTimes());
+        }
+    }
+
+    @Test
+    public void testReviveRetryWithSuspendFalseMultipleTimes() {
+        
Mockito.when(brokerController.getTopicConfigManager().selectTopicConfig(topicId)).thenReturn(null);
+        
Mockito.when(brokerController.getConsumerOffsetManager().queryOffset(groupId, 
topicId, 0)).thenReturn(-1L);
+
+        consumerService.createRetryTopicIfNeeded(groupId, topicId);
+        consumerService.clearCache(groupId, topicId, queueId);
+
+        // Create message with reconsumeTimes = 0
+        MessageExt messageExt = new MessageExt();
+        messageExt.setBody("body".getBytes());
+        messageExt.setBornTimestamp(System.currentTimeMillis());
+        messageExt.setFlag(0);
+        messageExt.setSysFlag(0);
+        messageExt.setReconsumeTimes(0);
+        messageExt.putUserProperty("key", "value");
+
+        
Mockito.when(brokerController.getBrokerStatsManager()).thenReturn(Mockito.mock(BrokerStatsManager.class));
+        EscapeBridge escapeBridge = Mockito.mock(EscapeBridge.class);
+        
Mockito.when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
+
+        PopConsumerService consumerServiceSpy = Mockito.spy(consumerService);
+        
Mockito.doNothing().when(consumerServiceSpy).createRetryTopicIfNeeded(any(), 
any());
+
+        // Simulate multiple nacks with suspend = false
+        for (int i = 0; i < 3; i++) {
+            PopConsumerRecord record = new PopConsumerRecord();
+            record.setTopicId(topicId);
+            record.setGroupId(groupId);
+            record.setQueueId(queueId);
+            record.setPopTime(System.currentTimeMillis());
+            record.setInvisibleTime(30000);
+            record.setOffset(400L + i);
+            record.setSuspend(false);
+
+            // Capture the MessageExtBrokerInner to verify reconsumeTimes
+            org.mockito.ArgumentCaptor<MessageExtBrokerInner> messageCaptor =
+                
org.mockito.ArgumentCaptor.forClass(MessageExtBrokerInner.class);
+            
Mockito.when(escapeBridge.putMessageToSpecificQueue(messageCaptor.capture()))
+                .thenReturn(new PutMessageResult(
+                    PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+
+            Assert.assertTrue("Revive should succeed", 
consumerServiceSpy.reviveRetry(record, messageExt));
+
+            // Verify that reconsumeTimes is incremented each time
+            MessageExtBrokerInner capturedMessage = messageCaptor.getValue();
+            Assert.assertNotNull("Message should be captured", 
capturedMessage);
+            Assert.assertEquals("ReconsumeTimes should be " + (i + 1) + " 
after " + (i + 1) + " nacks with suspend=false",
+                i + 1, capturedMessage.getReconsumeTimes());
+
+            // Update messageExt for next iteration (simulate the message 
being re-consumed)
+            messageExt.setReconsumeTimes(capturedMessage.getReconsumeTimes());
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
index 7afd338dca..75ce68f4bd 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
@@ -25,7 +25,9 @@ import org.apache.rocketmq.broker.failover.EscapeBridge;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import com.alibaba.fastjson2.JSON;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -46,16 +48,19 @@ import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.pop.PopCheckPoint;
 import org.apache.rocketmq.store.exception.ConsumeQueueException;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.lang.reflect.Field;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -250,4 +255,252 @@ public class ChangeInvisibleTimeProcessorTest {
         assertNotNull(response);
         assertEquals(ResponseCode.SUCCESS, response.getCode());
     }
+
+    @Test
+    public void testProcessRequestAsyncWithSuspendTrue() throws Exception {
+        // Setup mocks
+        Channel mockChannel = mock(Channel.class);
+        RemotingCommand mockRequest = mock(RemotingCommand.class);
+        BrokerController mockBrokerController = mock(BrokerController.class);
+        TopicConfigManager mockTopicConfigManager = 
mock(TopicConfigManager.class);
+        MessageStore mockMessageStore = mock(MessageStore.class);
+        BrokerConfig mockBrokerConfig = mock(BrokerConfig.class);
+        BrokerStatsManager mockBrokerStatsManager = 
mock(BrokerStatsManager.class);
+        PopMessageProcessor mockPopMessageProcessor = 
mock(PopMessageProcessor.class);
+        PopBufferMergeService mockPopBufferMergeService = 
mock(PopBufferMergeService.class);
+        BrokerMetricsManager brokerMetricsManager = 
mock(BrokerMetricsManager.class);
+        PopMetricsManager popMetricsManager = mock(PopMetricsManager.class);
+        EscapeBridge mockEscapeBridge = mock(EscapeBridge.class);
+
+        
when(brokerMetricsManager.getPopMetricsManager()).thenReturn(popMetricsManager);
+        
when(mockBrokerController.getBrokerMetricsManager()).thenReturn(brokerMetricsManager);
+        doNothing().when(popMetricsManager).incPopReviveCkPutCount(any(), 
any());
+        
when(mockBrokerController.getTopicConfigManager()).thenReturn(mockTopicConfigManager);
+        
when(mockBrokerController.getMessageStore()).thenReturn(mockMessageStore);
+        
when(mockBrokerController.getBrokerConfig()).thenReturn(mockBrokerConfig);
+        
when(mockBrokerController.getBrokerStatsManager()).thenReturn(mockBrokerStatsManager);
+        
when(mockBrokerController.getPopMessageProcessor()).thenReturn(mockPopMessageProcessor);
+        
when(mockPopMessageProcessor.getPopBufferMergeService()).thenReturn(mockPopBufferMergeService);
+        when(mockPopBufferMergeService.addAk(anyInt(), 
any())).thenReturn(false);
+        
when(mockBrokerController.getEscapeBridge()).thenReturn(mockEscapeBridge);
+
+        PutMessageResult mockPutMessageResult = new 
PutMessageResult(PutMessageStatus.PUT_OK, null, true);
+        when(mockEscapeBridge.asyncPutMessageToSpecificQueue(any()))
+                
.thenReturn(CompletableFuture.completedFuture(mockPutMessageResult));
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setReadQueueNums(4);
+        
when(mockTopicConfigManager.selectTopicConfig(anyString())).thenReturn(topicConfig);
+        when(mockMessageStore.getMinOffsetInQueue(anyString(), 
anyInt())).thenReturn(0L);
+        when(mockMessageStore.getMaxOffsetInQueue(anyString(), 
anyInt())).thenReturn(10L);
+        
when(mockBrokerConfig.isPopConsumerKVServiceEnable()).thenReturn(false);
+
+        ChangeInvisibleTimeRequestHeader requestHeader = new 
ChangeInvisibleTimeRequestHeader();
+        requestHeader.setTopic("TestTopic");
+        requestHeader.setQueueId(1);
+        requestHeader.setOffset(5L);
+        requestHeader.setConsumerGroup("TestGroup");
+        requestHeader.setExtraInfo("0 10000 10000 0 TestBroker 1");
+        requestHeader.setInvisibleTime(60000L);
+        requestHeader.setSuspend(true); // Test with suspend=true
+        
when(mockRequest.decodeCommandCustomHeader(ChangeInvisibleTimeRequestHeader.class)).thenReturn(requestHeader);
+
+        ChangeInvisibleTimeProcessor processor = new 
ChangeInvisibleTimeProcessor(mockBrokerController);
+        CompletableFuture<RemotingCommand> futureResponse = 
processor.processRequestAsync(mockChannel, mockRequest, true);
+
+        RemotingCommand response = futureResponse.get();
+        assertNotNull(response);
+        assertEquals(ResponseCode.SUCCESS, response.getCode());
+    }
+
+    @Test
+    public void testProcessRequestAsyncWithSuspendFalse() throws Exception {
+        // Setup mocks
+        Channel mockChannel = mock(Channel.class);
+        RemotingCommand mockRequest = mock(RemotingCommand.class);
+        BrokerController mockBrokerController = mock(BrokerController.class);
+        TopicConfigManager mockTopicConfigManager = 
mock(TopicConfigManager.class);
+        MessageStore mockMessageStore = mock(MessageStore.class);
+        BrokerConfig mockBrokerConfig = mock(BrokerConfig.class);
+        BrokerStatsManager mockBrokerStatsManager = 
mock(BrokerStatsManager.class);
+        PopMessageProcessor mockPopMessageProcessor = 
mock(PopMessageProcessor.class);
+        PopBufferMergeService mockPopBufferMergeService = 
mock(PopBufferMergeService.class);
+        BrokerMetricsManager brokerMetricsManager = 
mock(BrokerMetricsManager.class);
+        PopMetricsManager popMetricsManager = mock(PopMetricsManager.class);
+        EscapeBridge mockEscapeBridge = mock(EscapeBridge.class);
+
+        
when(brokerMetricsManager.getPopMetricsManager()).thenReturn(popMetricsManager);
+        
when(mockBrokerController.getBrokerMetricsManager()).thenReturn(brokerMetricsManager);
+        doNothing().when(popMetricsManager).incPopReviveCkPutCount(any(), 
any());
+        
when(mockBrokerController.getTopicConfigManager()).thenReturn(mockTopicConfigManager);
+        
when(mockBrokerController.getMessageStore()).thenReturn(mockMessageStore);
+        
when(mockBrokerController.getBrokerConfig()).thenReturn(mockBrokerConfig);
+        
when(mockBrokerController.getBrokerStatsManager()).thenReturn(mockBrokerStatsManager);
+        
when(mockBrokerController.getPopMessageProcessor()).thenReturn(mockPopMessageProcessor);
+        
when(mockPopMessageProcessor.getPopBufferMergeService()).thenReturn(mockPopBufferMergeService);
+        when(mockPopBufferMergeService.addAk(anyInt(), 
any())).thenReturn(false);
+        
when(mockBrokerController.getEscapeBridge()).thenReturn(mockEscapeBridge);
+
+        PutMessageResult mockPutMessageResult = new 
PutMessageResult(PutMessageStatus.PUT_OK, null, true);
+        when(mockEscapeBridge.asyncPutMessageToSpecificQueue(any()))
+                
.thenReturn(CompletableFuture.completedFuture(mockPutMessageResult));
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setReadQueueNums(4);
+        
when(mockTopicConfigManager.selectTopicConfig(anyString())).thenReturn(topicConfig);
+        when(mockMessageStore.getMinOffsetInQueue(anyString(), 
anyInt())).thenReturn(0L);
+        when(mockMessageStore.getMaxOffsetInQueue(anyString(), 
anyInt())).thenReturn(10L);
+        
when(mockBrokerConfig.isPopConsumerKVServiceEnable()).thenReturn(false);
+
+        ChangeInvisibleTimeRequestHeader requestHeader = new 
ChangeInvisibleTimeRequestHeader();
+        requestHeader.setTopic("TestTopic");
+        requestHeader.setQueueId(1);
+        requestHeader.setOffset(5L);
+        requestHeader.setConsumerGroup("TestGroup");
+        requestHeader.setExtraInfo("0 10000 10000 0 TestBroker 1");
+        requestHeader.setInvisibleTime(60000L);
+        requestHeader.setSuspend(false); // Test with suspend=false
+        
when(mockRequest.decodeCommandCustomHeader(ChangeInvisibleTimeRequestHeader.class)).thenReturn(requestHeader);
+
+        ChangeInvisibleTimeProcessor processor = new 
ChangeInvisibleTimeProcessor(mockBrokerController);
+        CompletableFuture<RemotingCommand> futureResponse = 
processor.processRequestAsync(mockChannel, mockRequest, true);
+
+        RemotingCommand response = futureResponse.get();
+        assertNotNull(response);
+        assertEquals(ResponseCode.SUCCESS, response.getCode());
+    }
+
+    @Test
+    public void testProcessRequestWithSuspendTrue() throws 
RemotingCommandException, ConsumeQueueException {
+        when(messageStore.getMaxOffsetInQueue(anyString(), 
anyInt())).thenReturn(2L);
+        
when(escapeBridge.asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(new
 PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK))));
+        int queueId = 0;
+        long queueOffset = 0;
+        long popTime = System.currentTimeMillis() - 1_000;
+        long invisibleTime = 30_000;
+        int reviveQid = 0;
+        String brokerName = "test_broker";
+        String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime, 
invisibleTime, reviveQid,
+            topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR + 
queueOffset;
+
+        ChangeInvisibleTimeRequestHeader requestHeader = new 
ChangeInvisibleTimeRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(queueId);
+        requestHeader.setOffset(queueOffset);
+        requestHeader.setConsumerGroup(group);
+        requestHeader.setExtraInfo(extraInfo);
+        requestHeader.setInvisibleTime(invisibleTime);
+        requestHeader.setSuspend(true); // Set suspend to true
+
+        final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        RemotingCommand responseToReturn = 
changeInvisibleTimeProcessor.processRequest(handlerContext, request);
+        assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
+    }
+
+    @Test
+    public void testProcessRequestWithSuspendFalse() throws 
RemotingCommandException, ConsumeQueueException {
+        when(messageStore.getMaxOffsetInQueue(anyString(), 
anyInt())).thenReturn(2L);
+        
when(escapeBridge.asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(new
 PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK))));
+        int queueId = 0;
+        long queueOffset = 0;
+        long popTime = System.currentTimeMillis() - 1_000;
+        long invisibleTime = 30_000;
+        int reviveQid = 0;
+        String brokerName = "test_broker";
+        String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime, 
invisibleTime, reviveQid,
+            topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR + 
queueOffset;
+
+        ChangeInvisibleTimeRequestHeader requestHeader = new 
ChangeInvisibleTimeRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(queueId);
+        requestHeader.setOffset(queueOffset);
+        requestHeader.setConsumerGroup(group);
+        requestHeader.setExtraInfo(extraInfo);
+        requestHeader.setInvisibleTime(invisibleTime);
+        requestHeader.setSuspend(false); // Set suspend to false
+
+        final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        RemotingCommand responseToReturn = 
changeInvisibleTimeProcessor.processRequest(handlerContext, request);
+        assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
+    }
+
+    @Test
+    public void 
testAppendCheckPointThenAckOriginWritesSuspendTrueInCheckpoint() throws 
Exception {
+        when(messageStore.getMaxOffsetInQueue(anyString(), 
anyInt())).thenReturn(2L);
+        ArgumentCaptor<MessageExtBrokerInner> msgCaptor = 
ArgumentCaptor.forClass(MessageExtBrokerInner.class);
+        when(escapeBridge.asyncPutMessageToSpecificQueue(msgCaptor.capture()))
+            .thenReturn(CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK))));
+
+        int queueId = 0;
+        long queueOffset = 0;
+        long popTime = System.currentTimeMillis() - 1_000;
+        long invisibleTime = 30_000;
+        int reviveQid = 0;
+        String brokerName = "test_broker";
+        String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime, 
invisibleTime, reviveQid,
+            topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR + 
queueOffset;
+
+        ChangeInvisibleTimeRequestHeader requestHeader = new 
ChangeInvisibleTimeRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(queueId);
+        requestHeader.setOffset(queueOffset);
+        requestHeader.setConsumerGroup(group);
+        requestHeader.setExtraInfo(extraInfo);
+        requestHeader.setInvisibleTime(invisibleTime);
+        requestHeader.setSuspend(true);
+
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        changeInvisibleTimeProcessor.processRequest(handlerContext, request);
+
+        List<MessageExtBrokerInner> allValues = msgCaptor.getAllValues();
+        MessageExtBrokerInner ckMessage = allValues.stream()
+            .filter(m -> PopAckConstants.CK_TAG.equals(m.getTags()))
+            .findFirst()
+            .orElseThrow(() -> new AssertionError("No CK message captured"));
+        PopCheckPoint ck = JSON.parseObject(new String(ckMessage.getBody(), 
java.nio.charset.StandardCharsets.UTF_8), PopCheckPoint.class);
+        assertThat(ck.isSuspend()).isTrue();
+    }
+
+    @Test
+    public void 
testAppendCheckPointThenAckOriginWritesSuspendFalseInCheckpoint() throws 
Exception {
+        when(messageStore.getMaxOffsetInQueue(anyString(), 
anyInt())).thenReturn(2L);
+        ArgumentCaptor<MessageExtBrokerInner> msgCaptor = 
ArgumentCaptor.forClass(MessageExtBrokerInner.class);
+        when(escapeBridge.asyncPutMessageToSpecificQueue(msgCaptor.capture()))
+            .thenReturn(CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK))));
+
+        int queueId = 0;
+        long queueOffset = 0;
+        long popTime = System.currentTimeMillis() - 1_000;
+        long invisibleTime = 30_000;
+        int reviveQid = 0;
+        String brokerName = "test_broker";
+        String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime, 
invisibleTime, reviveQid,
+            topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR + 
queueOffset;
+
+        ChangeInvisibleTimeRequestHeader requestHeader = new 
ChangeInvisibleTimeRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(queueId);
+        requestHeader.setOffset(queueOffset);
+        requestHeader.setConsumerGroup(group);
+        requestHeader.setExtraInfo(extraInfo);
+        requestHeader.setInvisibleTime(invisibleTime);
+        requestHeader.setSuspend(false);
+
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        changeInvisibleTimeProcessor.processRequest(handlerContext, request);
+
+        List<MessageExtBrokerInner> allValues = msgCaptor.getAllValues();
+        MessageExtBrokerInner ckMessage = allValues.stream()
+            .filter(m -> PopAckConstants.CK_TAG.equals(m.getTags()))
+            .findFirst()
+            .orElseThrow(() -> new AssertionError("No CK message captured"));
+        PopCheckPoint ck = JSON.parseObject(new String(ckMessage.getBody(), 
java.nio.charset.StandardCharsets.UTF_8), PopCheckPoint.class);
+        assertThat(ck.isSuspend()).isFalse();
+    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
index 843c0edec1..69bd2a6bc4 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
@@ -136,7 +136,10 @@ public class ReceiveMessageResponseStreamWriter {
             messageExt.getMsgId(),
             request.getGroup().getName(),
             request.getMessageQueue().getTopic().getName(),
-            NACK_INVISIBLE_TIME
+            NACK_INVISIBLE_TIME,
+            null,
+            MessagingProcessor.DEFAULT_TIMEOUT_MILLS,
+            true
         );
     }
 
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index cd93aed0f7..dc0f86aae5 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -198,6 +198,18 @@ public class ConsumerProcessor extends AbstractProcessor {
                                 liteTopic,
                                 MessagingProcessor.DEFAULT_TIMEOUT_MILLS);
                             break;
+                        case TO_RETURN:
+                            this.messagingProcessor.changeInvisibleTime(
+                                    ctx,
+                                    ReceiptHandle.decode(handleString),
+                                    messageExt.getMsgId(),
+                                    consumerGroup,
+                                    topic,
+                                    MessagingProcessor.INVISIBLE_TIME_MS,
+                                    null,
+                                    MessagingProcessor.DEFAULT_TIMEOUT_MILLS,
+                                    true);
+                            break;
                         case MATCH:
                         default:
                             messageExtList.add(messageExt);
@@ -392,8 +404,8 @@ public class ConsumerProcessor extends AbstractProcessor {
             });
     }
 
-    public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx, 
ReceiptHandle handle,
-        String messageId, String groupName, String topicName, long 
invisibleTime, String liteTopic, long timeoutMillis) {
+    public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx, 
ReceiptHandle handle, String messageId,
+        String groupName, String topicName, long invisibleTime, String 
liteTopic, long timeoutMillis, boolean suspend) {
         CompletableFuture<AckResult> future = new CompletableFuture<>();
         try {
             this.validateReceiptHandle(handle);
@@ -406,6 +418,7 @@ public class ConsumerProcessor extends AbstractProcessor {
             changeInvisibleTimeRequestHeader.setOffset(handle.getOffset());
             changeInvisibleTimeRequestHeader.setInvisibleTime(invisibleTime);
             changeInvisibleTimeRequestHeader.setLiteTopic(liteTopic);
+            changeInvisibleTimeRequestHeader.setSuspend(suspend);
             long commitLogOffset = handle.getCommitLogOffset();
 
             future = 
this.serviceManager.getMessageService().changeInvisibleTime(
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index bc044ec7a1..60c9261050 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -234,16 +234,9 @@ public class DefaultMessagingProcessor extends 
AbstractStartAndShutdown implemen
 
     @Override
     public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx, 
ReceiptHandle handle, String messageId,
-        String groupName, String topicName, long invisibleTime, long 
timeoutMillis) {
+        String groupName, String topicName, long invisibleTime, String 
liteTopic, long timeoutMillis, boolean suspend) {
         return this.consumerProcessor.changeInvisibleTime(ctx, handle, 
messageId, groupName, topicName,
-            invisibleTime, null, timeoutMillis);
-    }
-
-    @Override
-    public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx, 
ReceiptHandle handle, String messageId,
-        String groupName, String topicName, long invisibleTime, String 
liteTopic, long timeoutMillis) {
-        return this.consumerProcessor.changeInvisibleTime(ctx, handle, 
messageId, groupName, topicName,
-            invisibleTime, liteTopic, timeoutMillis);
+            invisibleTime, liteTopic, timeoutMillis, suspend);
     }
 
     @Override
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index e2c3da6745..a1500dbded 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -52,6 +52,8 @@ public interface MessagingProcessor extends StartAndShutdown {
 
     long DEFAULT_TIMEOUT_MILLS = Duration.ofSeconds(2).toMillis();
 
+    long INVISIBLE_TIME_MS = Duration.ofSeconds(1).toMillis();
+
     SubscriptionGroupConfig getSubscriptionGroupConfig(
         ProxyContext ctx,
         String consumerGroupName
@@ -243,7 +245,7 @@ public interface MessagingProcessor extends 
StartAndShutdown {
         return changeInvisibleTime(ctx, handle, messageId, groupName, 
topicName, invisibleTime, DEFAULT_TIMEOUT_MILLS);
     }
 
-    CompletableFuture<AckResult> changeInvisibleTime(
+    default CompletableFuture<AckResult> changeInvisibleTime(
         ProxyContext ctx,
         ReceiptHandle handle,
         String messageId,
@@ -251,7 +253,9 @@ public interface MessagingProcessor extends 
StartAndShutdown {
         String topicName,
         long invisibleTime,
         long timeoutMillis
-    );
+    ) {
+        return changeInvisibleTime(ctx, handle, messageId, groupName, 
topicName, invisibleTime, null, timeoutMillis, false);
+    }
 
     default CompletableFuture<AckResult> changeInvisibleTime(
         ProxyContext ctx,
@@ -262,7 +266,7 @@ public interface MessagingProcessor extends 
StartAndShutdown {
         long invisibleTime,
         String liteTopic
     ) {
-        return changeInvisibleTime(ctx, handle, messageId, groupName, 
topicName, invisibleTime, liteTopic, DEFAULT_TIMEOUT_MILLS);
+        return changeInvisibleTime(ctx, handle, messageId, groupName, 
topicName, invisibleTime, liteTopic, DEFAULT_TIMEOUT_MILLS, false);
     }
 
     CompletableFuture<AckResult> changeInvisibleTime(
@@ -273,7 +277,8 @@ public interface MessagingProcessor extends 
StartAndShutdown {
         String topicName,
         long invisibleTime,
         String liteTopic,
-        long timeoutMillis
+        long timeoutMillis,
+        boolean suspend
     );
 
     CompletableFuture<PullResult> pullMessage(
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/PopMessageResultFilter.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/PopMessageResultFilter.java
index 09c1a0bf1a..60e888ca3d 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/PopMessageResultFilter.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/PopMessageResultFilter.java
@@ -25,7 +25,8 @@ public interface PopMessageResultFilter {
     enum FilterResult {
         TO_DLQ,
         NO_MATCH,
-        MATCH
+        MATCH,
+        TO_RETURN
     }
 
     FilterResult filterMessage(ProxyContext ctx, String consumerGroup, 
SubscriptionData subscriptionData,
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index b002db19b5..f7074dedd6 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -276,7 +276,10 @@ public class ReceiveMessageActivityTest extends 
BaseActivityTest {
             msgIdCaptor.capture(),
             anyString(),
             anyString(),
-            anyLong())).thenReturn(CompletableFuture.completedFuture(new 
AckResult()));
+            anyLong(),
+            any(),
+            anyLong(),
+            anyBoolean())).thenReturn(CompletableFuture.completedFuture(new 
AckResult()));
 
         // normal
         ProxyContext ctx = createContext();
@@ -308,7 +311,10 @@ public class ReceiveMessageActivityTest extends 
BaseActivityTest {
             anyString(),
             anyString(),
             anyString(),
-            anyLong());
+            anyLong(),
+            any(),
+            anyLong(),
+            anyBoolean());
         assertEquals(Arrays.asList(msgId1, msgId2), 
msgIdCaptor.getAllValues());
         assertEquals(Arrays.asList(popCk1, popCk2), 
receiptHandleCaptor.getAllValues().stream().map(ReceiptHandle::encode).collect(Collectors.toList()));
     }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriterTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriterTest.java
index a717c78ca1..2bc281376e 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriterTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriterTest.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
+import java.lang.reflect.Method;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
 import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
@@ -48,8 +49,10 @@ import org.mockito.ArgumentCaptor;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -76,7 +79,7 @@ public class ReceiveMessageResponseStreamWriterTest extends 
BaseActivityTest {
     public void testWriteMessage() {
         ArgumentCaptor<String> changeInvisibleTimeMsgIdCaptor = 
ArgumentCaptor.forClass(String.class);
         
doReturn(CompletableFuture.completedFuture(mock(AckResult.class))).when(this.messagingProcessor)
-            .changeInvisibleTime(any(), any(), 
changeInvisibleTimeMsgIdCaptor.capture(), anyString(), anyString(), anyLong());
+            .changeInvisibleTime(any(), any(), 
changeInvisibleTimeMsgIdCaptor.capture(), anyString(), anyString(), anyLong(), 
any(), anyLong(), anyBoolean());
 
         ArgumentCaptor<ReceiveMessageResponse> responseArgumentCaptor = 
ArgumentCaptor.forClass(ReceiveMessageResponse.class);
         AtomicInteger onNextCallNum = new AtomicInteger(0);
@@ -108,7 +111,7 @@ public class ReceiveMessageResponseStreamWriterTest extends 
BaseActivityTest {
         verify(streamObserver, times(1)).onCompleted();
         verify(streamObserver, times(4)).onNext(any());
         verify(this.messagingProcessor, times(1))
-            .changeInvisibleTime(any(), any(), anyString(), anyString(), 
anyString(), anyLong());
+            .changeInvisibleTime(any(), any(), anyString(), anyString(), 
anyString(), anyLong(), any(), anyLong(), eq(true));
 
         assertTrue(responseArgumentCaptor.getAllValues().get(0).hasStatus());
         assertEquals(Code.OK, 
responseArgumentCaptor.getAllValues().get(0).getStatus().getCode());
@@ -125,7 +128,7 @@ public class ReceiveMessageResponseStreamWriterTest extends 
BaseActivityTest {
             popResult
         );
         verify(this.messagingProcessor, times(3))
-            .changeInvisibleTime(any(), any(), anyString(), anyString(), 
anyString(), anyLong());
+            .changeInvisibleTime(any(), any(), anyString(), anyString(), 
anyString(), anyLong(), any(), anyLong(), eq(true));
     }
 
     @Test
@@ -152,6 +155,58 @@ public class ReceiveMessageResponseStreamWriterTest 
extends BaseActivityTest {
         assertEquals(Code.TOO_MANY_REQUESTS, response.getStatus().getCode());
     }
 
+    @Test
+    public void testNackMessageWithSuspendTrue() {
+        ArgumentCaptor<String> changeInvisibleTimeMsgIdCaptor = 
ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> changeInvisibleTimeGroupCaptor = 
ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> changeInvisibleTimeTopicCaptor = 
ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<Long> changeInvisibleTimeInvisibleTimeCaptor = 
ArgumentCaptor.forClass(Long.class);
+        ArgumentCaptor<Boolean> changeInvisibleTimeSuspendCaptor = 
ArgumentCaptor.forClass(Boolean.class);
+
+        
doReturn(CompletableFuture.completedFuture(mock(AckResult.class))).when(this.messagingProcessor)
+            .changeInvisibleTime(any(), any(), 
changeInvisibleTimeMsgIdCaptor.capture(),
+                changeInvisibleTimeGroupCaptor.capture(), 
changeInvisibleTimeTopicCaptor.capture(),
+                changeInvisibleTimeInvisibleTimeCaptor.capture(), any(), 
anyLong(),
+                changeInvisibleTimeSuspendCaptor.capture());
+
+        MessageExt messageExt = createMessageExt(TOPIC, "tag");
+        ReceiveMessageRequest receiveMessageRequest = 
ReceiveMessageRequest.newBuilder()
+            .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
+            
.setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
+            .build();
+
+        // Simulate nack by calling processThrowableWhenWriteMessage using 
reflection
+        // This is called when an exception occurs during message processing
+        try {
+            Method method = 
ReceiveMessageResponseStreamWriter.class.getDeclaredMethod(
+                "processThrowableWhenWriteMessage",
+                Throwable.class, ProxyContext.class, 
ReceiveMessageRequest.class, MessageExt.class);
+            method.setAccessible(true);
+            method.invoke(writer,
+                new RuntimeException("Test exception"),
+                ProxyContext.create(),
+                receiveMessageRequest,
+                messageExt);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        // Verify that changeInvisibleTime was called with suspend=true
+        verify(this.messagingProcessor, times(1))
+            .changeInvisibleTime(any(), any(), eq(messageExt.getMsgId()),
+                eq(CONSUMER_GROUP), eq(TOPIC), 
eq(ReceiveMessageResponseStreamWriter.NACK_INVISIBLE_TIME),
+                eq(null), 
eq(org.apache.rocketmq.proxy.processor.MessagingProcessor.DEFAULT_TIMEOUT_MILLS),
+                eq(true));
+
+        assertEquals(messageExt.getMsgId(), 
changeInvisibleTimeMsgIdCaptor.getValue());
+        assertEquals(CONSUMER_GROUP, 
changeInvisibleTimeGroupCaptor.getValue());
+        assertEquals(TOPIC, changeInvisibleTimeTopicCaptor.getValue());
+        assertEquals(ReceiveMessageResponseStreamWriter.NACK_INVISIBLE_TIME,
+            changeInvisibleTimeInvisibleTimeCaptor.getValue().longValue());
+        assertTrue("Suspend should be true for nack", 
changeInvisibleTimeSuspendCaptor.getValue());
+    }
+
+
     private static MessageExt createMessageExt(String topic, String tags) {
         String msgId = MessageClientIDSetter.createUniqID();
 
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
index 9b203ef1f6..4eee5a079c 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
@@ -61,9 +61,11 @@ import org.mockito.stubbing.Answer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -290,7 +292,7 @@ public class ConsumerProcessorTest extends 
BaseProcessorTest {
             .thenReturn(CompletableFuture.completedFuture(innerAckResult));
 
         AckResult ackResult = 
this.consumerProcessor.changeInvisibleTime(createContext(), handle, 
MessageClientIDSetter.createUniqID(),
-            CONSUMER_GROUP, TOPIC, 1000, null, 3000).get();
+            CONSUMER_GROUP, TOPIC, 1000, null, 3000, true).get();
 
         assertEquals(AckStatus.OK, ackResult.getStatus());
         assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP, new 
BrokerConfig().isEnableRetryTopicV2()), 
requestHeaderArgumentCaptor.getValue().getTopic());
@@ -357,4 +359,106 @@ public class ConsumerProcessorTest extends 
BaseProcessorTest {
             .get();
         assertThat(result).isEqualTo(Sets.newHashSet(mq1));
     }
+
+    @Test
+    public void testPopMessageWithToReturnFilter() throws Throwable {
+        final String tag = "tag";
+        final long invisibleTime = Duration.ofSeconds(15).toMillis();
+        ArgumentCaptor<AddressableMessageQueue> messageQueueArgumentCaptor = 
ArgumentCaptor.forClass(AddressableMessageQueue.class);
+        ArgumentCaptor<PopMessageRequestHeader> requestHeaderArgumentCaptor = 
ArgumentCaptor.forClass(PopMessageRequestHeader.class);
+
+        List<MessageExt> messageExtList = new ArrayList<>();
+        messageExtList.add(createMessageExt(TOPIC, tag, 0, invisibleTime));
+        PopResult innerPopResult = new PopResult(PopStatus.FOUND, 
messageExtList);
+        when(this.messageService.popMessage(any(), 
messageQueueArgumentCaptor.capture(), requestHeaderArgumentCaptor.capture(), 
anyLong()))
+            .thenReturn(CompletableFuture.completedFuture(innerPopResult));
+
+        when(this.topicRouteService.getCurrentMessageQueueView(any(), 
anyString()))
+            .thenReturn(mock(MessageQueueView.class));
+
+        ArgumentCaptor<String> ackMessageIdArgumentCaptor = 
ArgumentCaptor.forClass(String.class);
+        when(this.messagingProcessor.ackMessage(any(), any(), 
ackMessageIdArgumentCaptor.capture(), anyString(), anyString(), any(), 
anyLong()))
+            
.thenReturn(CompletableFuture.completedFuture(mock(AckResult.class)));
+
+        ArgumentCaptor<String> changeInvisibleTimeMessageIdArgumentCaptor = 
ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<Long> changeInvisibleTimeInvisibleTimeArgumentCaptor = 
ArgumentCaptor.forClass(Long.class);
+        ArgumentCaptor<Boolean> changeInvisibleTimeSuspendArgumentCaptor = 
ArgumentCaptor.forClass(Boolean.class);
+        when(this.messagingProcessor.changeInvisibleTime(any(), any(), 
changeInvisibleTimeMessageIdArgumentCaptor.capture(),
+            anyString(), anyString(), 
changeInvisibleTimeInvisibleTimeArgumentCaptor.capture(), any(), anyLong(),
+            changeInvisibleTimeSuspendArgumentCaptor.capture()))
+            
.thenReturn(CompletableFuture.completedFuture(mock(AckResult.class)));
+
+        AddressableMessageQueue messageQueue = 
mock(AddressableMessageQueue.class);
+        PopResult popResult = this.consumerProcessor.popMessage(
+            createContext(),
+            (ctx, messageQueueView) -> messageQueue,
+            CONSUMER_GROUP,
+            TOPIC,
+            60,
+            invisibleTime,
+            Duration.ofSeconds(3).toMillis(),
+            ConsumeInitMode.MAX,
+            FilterAPI.build(TOPIC, tag, ExpressionType.TAG),
+            false,
+            (ctx, consumerGroup, subscriptionData, messageExt) -> {
+                // Return TO_RETURN for the message
+                return PopMessageResultFilter.FilterResult.TO_RETURN;
+            },
+            null,
+            Duration.ofSeconds(3).toMillis()
+        ).get();
+
+        // Verify that changeInvisibleTime was called with suspend=true
+        verify(this.messagingProcessor).changeInvisibleTime(any(), any(), 
eq(messageExtList.get(0).getMsgId()),
+            eq(CONSUMER_GROUP), eq(TOPIC), 
eq(Duration.ofSeconds(1).toMillis()), eq(null),
+            eq(MessagingProcessor.DEFAULT_TIMEOUT_MILLS), eq(true));
+
+        // Verify that the message was NOT added to the result list
+        assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+        assertEquals(0, popResult.getMsgFoundList().size());
+    }
+
+    @Test
+    public void testChangeInvisibleTimeWithSuspendFalse() throws Throwable {
+        ReceiptHandle handle = 
create(createMessageExt(MixAll.RETRY_GROUP_TOPIC_PREFIX + TOPIC, "", 0, 3000));
+        assertNotNull(handle);
+
+        ArgumentCaptor<ChangeInvisibleTimeRequestHeader> 
requestHeaderArgumentCaptor = 
ArgumentCaptor.forClass(ChangeInvisibleTimeRequestHeader.class);
+        AckResult innerAckResult = new AckResult();
+        innerAckResult.setStatus(AckStatus.OK);
+        when(this.messageService.changeInvisibleTime(any(), any(), 
anyString(), requestHeaderArgumentCaptor.capture(), anyLong()))
+            .thenReturn(CompletableFuture.completedFuture(innerAckResult));
+
+        AckResult ackResult = 
this.consumerProcessor.changeInvisibleTime(createContext(), handle, 
MessageClientIDSetter.createUniqID(),
+            CONSUMER_GROUP, TOPIC, 1000, null, 3000, false).get();
+
+        assertEquals(AckStatus.OK, ackResult.getStatus());
+        assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP, new 
BrokerConfig().isEnableRetryTopicV2()), 
requestHeaderArgumentCaptor.getValue().getTopic());
+        assertEquals(CONSUMER_GROUP, 
requestHeaderArgumentCaptor.getValue().getConsumerGroup());
+        assertEquals(1000, 
requestHeaderArgumentCaptor.getValue().getInvisibleTime().longValue());
+        assertEquals(handle.getReceiptHandle(), 
requestHeaderArgumentCaptor.getValue().getExtraInfo());
+        assertFalse("Suspend should be false", 
requestHeaderArgumentCaptor.getValue().isSuspend());
+    }
+
+    @Test
+    public void testChangeInvisibleTimeWithSuspendTrue() throws Throwable {
+        ReceiptHandle handle = 
create(createMessageExt(MixAll.RETRY_GROUP_TOPIC_PREFIX + TOPIC, "", 0, 3000));
+        assertNotNull(handle);
+
+        ArgumentCaptor<ChangeInvisibleTimeRequestHeader> 
requestHeaderArgumentCaptor = 
ArgumentCaptor.forClass(ChangeInvisibleTimeRequestHeader.class);
+        AckResult innerAckResult = new AckResult();
+        innerAckResult.setStatus(AckStatus.OK);
+        when(this.messageService.changeInvisibleTime(any(), any(), 
anyString(), requestHeaderArgumentCaptor.capture(), anyLong()))
+            .thenReturn(CompletableFuture.completedFuture(innerAckResult));
+
+        AckResult ackResult = 
this.consumerProcessor.changeInvisibleTime(createContext(), handle, 
MessageClientIDSetter.createUniqID(),
+            CONSUMER_GROUP, TOPIC, 1000, null, 3000, true).get();
+
+        assertEquals(AckStatus.OK, ackResult.getStatus());
+        assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP, new 
BrokerConfig().isEnableRetryTopicV2()), 
requestHeaderArgumentCaptor.getValue().getTopic());
+        assertEquals(CONSUMER_GROUP, 
requestHeaderArgumentCaptor.getValue().getConsumerGroup());
+        assertEquals(1000, 
requestHeaderArgumentCaptor.getValue().getInvisibleTime().longValue());
+        assertEquals(handle.getReceiptHandle(), 
requestHeaderArgumentCaptor.getValue().getExtraInfo());
+        assertTrue("Suspend should be true", 
requestHeaderArgumentCaptor.getValue().isSuspend());
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ChangeInvisibleTimeRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ChangeInvisibleTimeRequestHeader.java
index 9d44590da3..a80b2cfb6d 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ChangeInvisibleTimeRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ChangeInvisibleTimeRequestHeader.java
@@ -50,6 +50,8 @@ public class ChangeInvisibleTimeRequestHeader extends 
TopicQueueRequestHeader {
 
     private String liteTopic;
 
+    private boolean suspend = false;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -113,6 +115,14 @@ public class ChangeInvisibleTimeRequestHeader extends 
TopicQueueRequestHeader {
         this.liteTopic = liteTopic;
     }
 
+    public boolean isSuspend() {
+        return suspend;
+    }
+
+    public void setSuspend(boolean suspend) {
+        this.suspend = suspend;
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
@@ -123,6 +133,7 @@ public class ChangeInvisibleTimeRequestHeader extends 
TopicQueueRequestHeader {
             .add("offset", offset)
             .add("invisibleTime", invisibleTime)
             .add("liteTopic", liteTopic)
+            .add("suspend", suspend)
             .omitNullValues()
             .toString();
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java 
b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
index e3587aa28c..803ebc6895 100644
--- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
@@ -45,6 +45,8 @@ public class PopCheckPoint implements 
Comparable<PopCheckPoint> {
     String brokerName;
     @JSONField(name = "rp")
     String rePutTimes; // ck rePut times
+    @JSONField(name = "sp")
+    private boolean suspend; // nack without inc reconsume times, false 
default.
 
     public long getReviveOffset() {
         return reviveOffset;
@@ -148,6 +150,14 @@ public class PopCheckPoint implements 
Comparable<PopCheckPoint> {
         this.rePutTimes = rePutTimes;
     }
 
+    public boolean isSuspend() {
+        return suspend;
+    }
+
+    public void setSuspend(boolean suspend) {
+        this.suspend = suspend;
+    }
+
     public void addDiff(int diff) {
         if (this.queueOffsetDiff == null) {
             this.queueOffsetDiff = new ArrayList<>(8);
@@ -197,7 +207,7 @@ public class PopCheckPoint implements 
Comparable<PopCheckPoint> {
     @Override
     public String toString() {
         return "PopCheckPoint [topic=" + topic + ", cid=" + cid + ", queueId=" 
+ queueId + ", startOffset=" + startOffset + ", bitMap=" + bitMap + ", num=" + 
num + ", reviveTime=" + getReviveTime()
-            + ", reviveOffset=" + reviveOffset + ", diff=" + queueOffsetDiff + 
", brokerName=" + brokerName + ", rePutTimes=" + rePutTimes + "]";
+            + ", reviveOffset=" + reviveOffset + ", diff=" + queueOffsetDiff + 
", brokerName=" + brokerName + ", rePutTimes=" + rePutTimes + ", suspend=" + 
suspend + "]";
     }
 
     @Override

Reply via email to