This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3b78ab0960 [ISSUE #9632] Fix Pop Long-polling Not Awakened for V1 
Retry Messages (#9828)
3b78ab0960 is described below

commit 3b78ab09608fed81c4bcb1d621d82520a89e818b
Author: qianye <[email protected]>
AuthorDate: Wed Nov 12 20:01:23 2025 +0800

    [ISSUE #9632] Fix Pop Long-polling Not Awakened for V1 Retry Messages 
(#9828)
---
 .../broker/longpolling/PopLongPollingService.java  | 34 ++++++++++++++++++----
 .../rocketmq/broker/pop/PopConsumerService.java    |  1 +
 .../broker/processor/PopReviveService.java         |  4 +--
 .../longpolling/PopLongPollingServiceTest.java     | 33 +++++++++++++++++----
 .../rocketmq/common/message/MessageConst.java      |  2 ++
 5 files changed, 61 insertions(+), 13 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index 7068793fae..c595178d19 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -27,18 +27,22 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.CommandCallback;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.store.ConsumeQueueExt;
 import org.apache.rocketmq.store.MessageFilter;
@@ -167,13 +171,31 @@ public class PopLongPollingService extends ServiceThread {
 
     public void notifyMessageArrivingWithRetryTopic(final String topic, final 
int queueId, long offset,
         Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
-        String notifyTopic;
-        if (KeyBuilder.isPopRetryTopicV2(topic)) {
-            notifyTopic = KeyBuilder.parseNormalTopic(topic);
+        if (NamespaceUtil.isRetryTopic(topic)) {
+            notifyMessageArrivingFromRetry(topic, queueId, tagsCode, 
msgStoreTime, filterBitMap, properties);
         } else {
-            notifyTopic = topic;
+            notifyMessageArriving(topic, queueId, offset, tagsCode, 
msgStoreTime, filterBitMap, properties);
+        }
+    }
+
+    private void notifyMessageArrivingFromRetry(String topic, int queueId, 
Long tagsCode, long msgStoreTime, byte[] filterBitMap,
+        Map<String, String> properties) {
+        String prefix = MixAll.RETRY_GROUP_TOPIC_PREFIX;
+        String originGroup = 
properties.get(MessageConst.PROPERTY_ORIGIN_GROUP);
+        // In the case of pop consumption, there is no long polling hanging on 
the retry topic, so the wake-up is skipped.
+        if (StringUtils.isBlank(originGroup)) {
+            return;
+        }
+        // %RETRY%GROUP is used for pull mode, so the wake-up is skipped.
+        int originTopicStartIndex = prefix.length() + originGroup.length() + 1;
+        if (topic.length() <= originTopicStartIndex) {
+            return;
+        }
+        String originTopic = topic.substring(originTopicStartIndex);
+        if (queueId >= 0) {
+            notifyMessageArriving(originTopic, -1, originGroup, true, 
tagsCode, msgStoreTime, filterBitMap, properties);
         }
-        notifyMessageArriving(notifyTopic, queueId, offset, tagsCode, 
msgStoreTime, filterBitMap, properties);
+        notifyMessageArriving(originTopic, queueId, originGroup, true, 
tagsCode, msgStoreTime, filterBitMap, properties);
     }
 
     public void notifyMessageArriving(final String topic, final int queueId, 
long offset,
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index 277a4999cf..57fac798b2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -624,6 +624,7 @@ public class PopConsumerService extends ServiceThread {
             msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) 
== null) {
             msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, 
String.valueOf(record.getPopTime()));
         }
+        msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, 
record.getGroupId());
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
 
         PutMessageResult putMessageResult =
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 7bf3595be8..aa7d87505e 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -31,17 +31,16 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
-
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -128,6 +127,7 @@ public class PopReviveService extends ServiceThread {
         if (messageExt.getReconsumeTimes() == 0 || 
msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
             msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, 
String.valueOf(popCheckPoint.getPopTime()));
         }
+        msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, 
popCheckPoint.getCId());
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
         addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
         PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
index 3547687a6d..23dcf5c2fd 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
@@ -20,11 +20,18 @@ import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -36,11 +43,6 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ExecutorService;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -87,6 +89,27 @@ public class PopLongPollingServiceTest {
         verify(popLongPollingService, 
times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, -1L, null, 
0L, null, null);
     }
 
+    @Test
+    public void testNotifyMessageArrivingFromRetry() {
+        int queueId = -1;
+        String group = "group";
+        String pullRetryTopic = MixAll.getRetryTopic(group);
+        String popRetryTopicV1 = KeyBuilder.buildPopRetryTopic(defaultTopic, 
group, false);
+        String popRetryTopicV2 = KeyBuilder.buildPopRetryTopic(defaultTopic, 
group, true);
+
+        Map<String, String> properties = new HashMap<>();
+        properties.putIfAbsent(MessageConst.PROPERTY_ORIGIN_GROUP, group);
+        // pull retry
+        
popLongPollingService.notifyMessageArrivingWithRetryTopic(pullRetryTopic, 
queueId, queueId, -1L, 0L, null, properties);
+        verify(popLongPollingService, 
times(0)).notifyMessageArriving(defaultTopic, queueId, group, true, -1L, 0L, 
null, properties, null);
+        // pop retry v1
+        
popLongPollingService.notifyMessageArrivingWithRetryTopic(popRetryTopicV1, 
queueId, queueId, -1L, 0L, null, properties);
+        verify(popLongPollingService, 
times(1)).notifyMessageArriving(defaultTopic, queueId, group, true, -1L, 0L, 
null, properties, null);
+        // pop retry v2
+        
popLongPollingService.notifyMessageArrivingWithRetryTopic(popRetryTopicV2, 
queueId, queueId, -1L, 0L, null, properties);
+        verify(popLongPollingService, 
times(2)).notifyMessageArriving(defaultTopic, queueId, group, true, -1L, 0L, 
null, properties, null);
+    }
+
     @Test
     public void testNotifyMessageArriving() {
         int queueId = 0;
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index 24f7bdb99a..2bdaabebae 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -24,6 +24,7 @@ public class MessageConst {
     public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
     public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
     public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
+    public static final String PROPERTY_ORIGIN_GROUP = "ORIGIN_GROUP";
     public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";
     public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";
     public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
@@ -113,6 +114,7 @@ public class MessageConst {
         STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK);
         STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL);
         STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC);
+        STRING_HASH_SET.add(PROPERTY_ORIGIN_GROUP);
         STRING_HASH_SET.add(PROPERTY_REAL_TOPIC);
         STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID);
         STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED);

Reply via email to