This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3b78ab0960 [ISSUE #9632] Fix Pop Long-polling Not Awakened for V1
Retry Messages (#9828)
3b78ab0960 is described below
commit 3b78ab09608fed81c4bcb1d621d82520a89e818b
Author: qianye <[email protected]>
AuthorDate: Wed Nov 12 20:01:23 2025 +0800
[ISSUE #9632] Fix Pop Long-polling Not Awakened for V1 Retry Messages
(#9828)
---
.../broker/longpolling/PopLongPollingService.java | 34 ++++++++++++++++++----
.../rocketmq/broker/pop/PopConsumerService.java | 1 +
.../broker/processor/PopReviveService.java | 4 +--
.../longpolling/PopLongPollingServiceTest.java | 33 +++++++++++++++++----
.../rocketmq/common/message/MessageConst.java | 2 ++
5 files changed, 61 insertions(+), 13 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index 7068793fae..c595178d19 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -27,18 +27,22 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.CommandCallback;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.MessageFilter;
@@ -167,13 +171,31 @@ public class PopLongPollingService extends ServiceThread {
public void notifyMessageArrivingWithRetryTopic(final String topic, final
int queueId, long offset,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
- String notifyTopic;
- if (KeyBuilder.isPopRetryTopicV2(topic)) {
- notifyTopic = KeyBuilder.parseNormalTopic(topic);
+ if (NamespaceUtil.isRetryTopic(topic)) {
+ notifyMessageArrivingFromRetry(topic, queueId, tagsCode,
msgStoreTime, filterBitMap, properties);
} else {
- notifyTopic = topic;
+ notifyMessageArriving(topic, queueId, offset, tagsCode,
msgStoreTime, filterBitMap, properties);
+ }
+ }
+
+ private void notifyMessageArrivingFromRetry(String topic, int queueId,
Long tagsCode, long msgStoreTime, byte[] filterBitMap,
+ Map<String, String> properties) {
+ String prefix = MixAll.RETRY_GROUP_TOPIC_PREFIX;
+ String originGroup =
properties.get(MessageConst.PROPERTY_ORIGIN_GROUP);
+ // In the case of pop consumption, there is no long polling hanging on
the retry topic, so the wake-up is skipped.
+ if (StringUtils.isBlank(originGroup)) {
+ return;
+ }
+ // %RETRY%GROUP is used for pull mode, so the wake-up is skipped.
+ int originTopicStartIndex = prefix.length() + originGroup.length() + 1;
+ if (topic.length() <= originTopicStartIndex) {
+ return;
+ }
+ String originTopic = topic.substring(originTopicStartIndex);
+ if (queueId >= 0) {
+ notifyMessageArriving(originTopic, -1, originGroup, true,
tagsCode, msgStoreTime, filterBitMap, properties);
}
- notifyMessageArriving(notifyTopic, queueId, offset, tagsCode,
msgStoreTime, filterBitMap, properties);
+ notifyMessageArriving(originTopic, queueId, originGroup, true,
tagsCode, msgStoreTime, filterBitMap, properties);
}
public void notifyMessageArriving(final String topic, final int queueId,
long offset,
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index 277a4999cf..57fac798b2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -624,6 +624,7 @@ public class PopConsumerService extends ServiceThread {
msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME)
== null) {
msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME,
String.valueOf(record.getPopTime()));
}
+ msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP,
record.getGroupId());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult =
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 7bf3595be8..aa7d87505e 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -31,17 +31,16 @@ import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
-
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
@@ -128,6 +127,7 @@ public class PopReviveService extends ServiceThread {
if (messageExt.getReconsumeTimes() == 0 ||
msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME,
String.valueOf(popCheckPoint.getPopTime()));
}
+ msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP,
popCheckPoint.getCId());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
index 3547687a6d..23dcf5c2fd 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
@@ -20,11 +20,18 @@ import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -36,11 +43,6 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ExecutorService;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -87,6 +89,27 @@ public class PopLongPollingServiceTest {
verify(popLongPollingService,
times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, -1L, null,
0L, null, null);
}
+ @Test
+ public void testNotifyMessageArrivingFromRetry() {
+ int queueId = -1;
+ String group = "group";
+ String pullRetryTopic = MixAll.getRetryTopic(group);
+ String popRetryTopicV1 = KeyBuilder.buildPopRetryTopic(defaultTopic,
group, false);
+ String popRetryTopicV2 = KeyBuilder.buildPopRetryTopic(defaultTopic,
group, true);
+
+ Map<String, String> properties = new HashMap<>();
+ properties.putIfAbsent(MessageConst.PROPERTY_ORIGIN_GROUP, group);
+ // pull retry
+
popLongPollingService.notifyMessageArrivingWithRetryTopic(pullRetryTopic,
queueId, queueId, -1L, 0L, null, properties);
+ verify(popLongPollingService,
times(0)).notifyMessageArriving(defaultTopic, queueId, group, true, -1L, 0L,
null, properties, null);
+ // pop retry v1
+
popLongPollingService.notifyMessageArrivingWithRetryTopic(popRetryTopicV1,
queueId, queueId, -1L, 0L, null, properties);
+ verify(popLongPollingService,
times(1)).notifyMessageArriving(defaultTopic, queueId, group, true, -1L, 0L,
null, properties, null);
+ // pop retry v2
+
popLongPollingService.notifyMessageArrivingWithRetryTopic(popRetryTopicV2,
queueId, queueId, -1L, 0L, null, properties);
+ verify(popLongPollingService,
times(2)).notifyMessageArriving(defaultTopic, queueId, group, true, -1L, 0L,
null, properties, null);
+ }
+
@Test
public void testNotifyMessageArriving() {
int queueId = 0;
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index 24f7bdb99a..2bdaabebae 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -24,6 +24,7 @@ public class MessageConst {
public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
+ public static final String PROPERTY_ORIGIN_GROUP = "ORIGIN_GROUP";
public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";
public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";
public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
@@ -113,6 +114,7 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK);
STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL);
STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC);
+ STRING_HASH_SET.add(PROPERTY_ORIGIN_GROUP);
STRING_HASH_SET.add(PROPERTY_REAL_TOPIC);
STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID);
STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED);