This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new bdae09ddf2 [RIP-80] #9928 Implementation of Priority Message (#9929)
bdae09ddf2 is described below
commit bdae09ddf22e63122c9ec82ad275450f0c915897
Author: imzs <[email protected]>
AuthorDate: Thu Dec 18 10:18:25 2025 +0800
[RIP-80] #9928 Implementation of Priority Message (#9929)
* #9928 Implementation of Priority Message
* switch to fastjson2
Change-Id: I3620b00b79a77a93a7cf0fdfac857fb495638ca6
* fix bazel CI, upgrade rocketmq-proto to 2.1.1
Change-Id: Ia66cc7b14b89dc319044ced5ba349315e487c849
* Fix bazel CI
* fix CI, temporarily disable popKv in OffsetResetForPopIT
Change-Id: I0b406cf0ece5067de430c4404aaa1f2dd46edcba
---------
Co-authored-by: RongtongJin <[email protected]>
---
WORKSPACE | 2 +-
.../rocketmq/broker/pop/PopConsumerService.java | 99 +++++--
.../broker/processor/PopMessageProcessor.java | 11 +-
.../broker/processor/PopReviveService.java | 51 +++-
.../broker/processor/SendMessageProcessor.java | 10 +
.../broker/pop/PopConsumerServiceTest.java | 3 +
.../org/apache/rocketmq/common/BrokerConfig.java | 31 +-
.../common/SubscriptionGroupAttributes.java | 9 +
.../common/attribute/TopicMessageType.java | 11 +-
.../apache/rocketmq/common/message/Message.java | 13 +
.../rocketmq/common/message/MessageConst.java | 2 +
.../common/attribute/TopicMessageTypeTest.java | 19 +-
pom.xml | 2 +-
.../proxy/grpc/v2/common/GrpcConverter.java | 6 +
.../grpc/v2/producer/SendMessageActivity.java | 6 +
.../proxy/grpc/v2/route/RouteActivity.java | 2 +
.../grpc/v2/producer/SendMessageActivityTest.java | 28 ++
.../subscription/SubscriptionGroupConfig.java | 10 +
.../test/client/rmq/RMQNormalProducer.java | 1 +
.../rocketmq/test/sendresult/ResultWrapper.java | 10 +
.../rocketmq/test/util/MQAdminTestUtils.java | 6 +-
.../org/apache/rocketmq/test/base/BaseConf.java | 13 +-
.../rocketmq/test/base/IntegrationTestBase.java | 1 +
.../test/client/consumer/pop/BasePopNormally.java | 8 +
.../test/client/consumer/pop/PopPriorityIT.java | 319 +++++++++++++++++++++
.../rocketmq/test/offset/OffsetResetForPopIT.java | 1 +
26 files changed, 621 insertions(+), 53 deletions(-)
diff --git a/WORKSPACE b/WORKSPACE
index 0f06aa2112..9c532b55ef 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -71,7 +71,7 @@ maven_install(
"org.bouncycastle:bcpkix-jdk15on:1.69",
"com.google.code.gson:gson:2.8.9",
"com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2",
- "org.apache.rocketmq:rocketmq-proto:2.0.4",
+ "org.apache.rocketmq:rocketmq-proto:2.1.1",
"com.google.protobuf:protobuf-java:3.20.1",
"com.google.protobuf:protobuf-java-util:3.20.1",
"com.conversantmedia:disruptor:1.2.10",
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index 839c96e390..7476a6c206 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -25,9 +25,11 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
@@ -38,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
@@ -324,6 +327,23 @@ public class PopConsumerService extends ServiceThread {
});
}
+ protected CompletableFuture<PopConsumerContext>
getMessageFromTopicAsync(CompletableFuture<PopConsumerContext> future,
+ String clientHost, String groupId, String topicId, long requestCount,
int batchSize, MessageFilter filter,
+ PopConsumerRecord.RetryType retryType) {
+ TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(topicId);
+ if (null == topicConfig) {
+ return future;
+ }
+ for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
+ long index =
(brokerController.getBrokerConfig().isPriorityOrderAsc() ?
+ topicConfig.getReadQueueNums() - 1 - i : i) + requestCount;
+ int current = (int) index % topicConfig.getReadQueueNums();
+ future = this.getMessageAsync(future, clientHost, groupId,
+ topicId, current, batchSize, filter, retryType);
+ }
+ return future;
+ }
+
public CompletableFuture<PopConsumerContext> popAsync(String clientHost,
long popTime, long invisibleTime,
String groupId, String topicId, int queueId, int batchSize, boolean
fifo, String attemptId, int initMode,
MessageFilter filter) {
@@ -336,6 +356,12 @@ public class PopConsumerService extends ServiceThread {
return CompletableFuture.completedFuture(popConsumerContext);
}
+ SubscriptionGroupConfig subscriptionGroupConfig =
+
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupId);
+ if (null == subscriptionGroupConfig ||
!subscriptionGroupConfig.isConsumeEnable()) {
+ return CompletableFuture.completedFuture(popConsumerContext);
+ }
+
log.debug("PopConsumerService popAsync, groupId={}, topicId={},
queueId={}, " +
"batchSize={}, invisibleTime={}, fifo={}, attemptId={},
filter={}",
groupId, topicId, queueId, batchSize, invisibleTime, fifo,
attemptId, filter);
@@ -345,7 +371,13 @@ public class PopConsumerService extends ServiceThread {
String retryTopicV2 = KeyBuilder.buildPopRetryTopicV2(topicId,
groupId);
long requestCount =
Objects.requireNonNull(ConcurrentHashMapUtils.computeIfAbsent(
requestCountTable, requestKey, k -> new
AtomicLong(0L))).getAndIncrement();
- boolean preferRetry = requestCount % 5L == 0L;
+ boolean usePriorityMode =
TopicMessageType.PRIORITY.equals(topicConfig.getTopicMessageType())
+ && !fifo && requestCount % 100L <
subscriptionGroupConfig.getPriorityFactor();
+ int probability = usePriorityMode ?
+ brokerConfig.getPopFromRetryProbabilityForPriority() :
brokerConfig.getPopFromRetryProbability();
+ probability = Math.max(0, Math.min(100, probability)); // [51, 100]
means always
+ boolean preferRetry = probability > 0 && requestCount % (100 /
probability) == 0L;
+ requestCount = usePriorityMode ? 0 : requestCount; // use requestCount
as randomQ
CompletableFuture<PopConsumerContext> getMessageFuture =
CompletableFuture.completedFuture(popConsumerContext);
@@ -353,13 +385,13 @@ public class PopConsumerService extends ServiceThread {
try {
if (!fifo && preferRetry) {
if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
- getMessageFuture = this.getMessageAsync(getMessageFuture,
clientHost, groupId,
- retryTopicV1, 0, batchSize, filter,
PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
+ getMessageFuture =
this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
+ retryTopicV1, requestCount, batchSize, filter,
PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
}
if (brokerConfig.isEnableRetryTopicV2()) {
- getMessageFuture = this.getMessageAsync(getMessageFuture,
clientHost, groupId,
- retryTopicV2, 0, batchSize, filter,
PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
+ getMessageFuture =
this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
+ retryTopicV2, requestCount, batchSize, filter,
PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
}
}
@@ -367,21 +399,18 @@ public class PopConsumerService extends ServiceThread {
getMessageFuture = this.getMessageAsync(getMessageFuture,
clientHost, groupId,
topicId, queueId, batchSize, filter,
PopConsumerRecord.RetryType.NORMAL_TOPIC);
} else {
- for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
- int current = (int) ((requestCount + i) %
topicConfig.getReadQueueNums());
- getMessageFuture = this.getMessageAsync(getMessageFuture,
clientHost, groupId,
- topicId, current, batchSize, filter,
PopConsumerRecord.RetryType.NORMAL_TOPIC);
- }
+ getMessageFuture =
this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
+ topicId, requestCount, batchSize, filter,
PopConsumerRecord.RetryType.NORMAL_TOPIC);
if (!fifo && !preferRetry) {
if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
- getMessageFuture =
this.getMessageAsync(getMessageFuture, clientHost, groupId,
- retryTopicV1, 0, batchSize, filter,
PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
+ getMessageFuture =
this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
+ retryTopicV1, requestCount, batchSize, filter,
PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
}
if (brokerConfig.isEnableRetryTopicV2()) {
- getMessageFuture =
this.getMessageAsync(getMessageFuture, clientHost, groupId,
- retryTopicV2, 0, batchSize, filter,
PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
+ getMessageFuture =
this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
+ retryTopicV2, requestCount, batchSize, filter,
PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
}
}
}
@@ -568,21 +597,33 @@ public class PopConsumerService extends ServiceThread {
return consumerRecords.size();
}
- public void createRetryTopicIfNeeded(String groupId, String topicId) {
- TopicConfig topicConfig =
brokerController.getTopicConfigManager().selectTopicConfig(topicId);
- if (topicConfig != null) {
+ public void createRetryTopicIfNeeded(String groupId, String retryTopic) {
+ TopicConfig topicConfig =
brokerController.getTopicConfigManager().selectTopicConfig(retryTopic);
+ if (topicConfig != null &&
!brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
return;
}
- topicConfig = new TopicConfig(topicId, 1, 1,
+ int retryQueueNum = PopAckConstants.retryQueueNum;
+ if (brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
+ String normalTopic = KeyBuilder.parseNormalTopic(retryTopic,
groupId);
+ TopicConfig normalConfig =
brokerController.getTopicConfigManager().selectTopicConfig(normalTopic); //
always exists
+ retryQueueNum = normalConfig.getWriteQueueNums();
+ if (topicConfig != null && topicConfig.getWriteQueueNums() ==
normalConfig.getWriteQueueNums()) {
+ return;
+ }
+ }
+
+ topicConfig = new TopicConfig(retryTopic, retryQueueNum, retryQueueNum,
PermName.PERM_READ | PermName.PERM_WRITE, 0);
topicConfig.setTopicFilterType(TopicFilterType.SINGLE_TAG);
brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
- long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(groupId, topicId,
0);
- if (offset < 0) {
- this.brokerController.getConsumerOffsetManager().commitOffset(
- "InitPopOffset", groupId, topicId, 0, 0);
+ for (int i = 0; i < retryQueueNum; i++) {
+ long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(groupId,
retryTopic, i);
+ if (offset < 0) {
+ this.brokerController.getConsumerOffsetManager().commitOffset(
+ "InitPopOffset", groupId, retryTopic, i, 0);
+ }
}
}
@@ -605,7 +646,7 @@ public class PopConsumerService extends ServiceThread {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(retryTopic);
msgInner.setBody(messageExt.getBody() != null ? messageExt.getBody() :
new byte[] {});
- msgInner.setQueueId(0);
+ msgInner.setQueueId(getRetryQueueId(retryTopic, messageExt));
if (messageExt.getTags() != null) {
msgInner.setTags(messageExt.getTags());
} else {
@@ -647,6 +688,18 @@ public class PopConsumerService extends ServiceThread {
return true;
}
+ private int getRetryQueueId(String retryTopic, MessageExt oriMsg) {
+ if (!brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
+ return 0;
+ }
+ int oriQueueId = oriMsg.getQueueId(); // original qid of normal or
retry topic
+ if (oriQueueId >
brokerController.getTopicConfigManager().selectTopicConfig(retryTopic).getWriteQueueNums()
- 1) {
+ log.warn("not expected, {}, {}, {}", retryTopic, oriQueueId,
oriMsg.getMsgId());
+ return 0; // fallback
+ }
+ return oriQueueId;
+ }
+
// Export kv store record to revive topic
@SuppressWarnings("ExtractMethodRecommender")
public synchronized void transferToFsStore() {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 3144eb973a..c32e1b5ae2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
@@ -512,11 +513,15 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
// considered the same type because they share the same retry flag in
previous fields.
// Therefore, needRetryV1 is designed as a subset of needRetry, and
within a single request,
// only one type of retry topic is able to call popMsgFromQueue.
- boolean needRetry = randomQ <
brokerConfig.getPopFromRetryProbability();
+ boolean usePriorityMode =
TopicMessageType.PRIORITY.equals(topicConfig.getTopicMessageType())
+ && !requestHeader.isOrder() && randomQ <
subscriptionGroupConfig.getPriorityFactor();
+ boolean needRetry = randomQ < (usePriorityMode ?
+ brokerConfig.getPopFromRetryProbabilityForPriority() :
brokerConfig.getPopFromRetryProbability());
boolean needRetryV1 = false;
if (brokerConfig.isEnableRetryTopicV2() &&
brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
needRetryV1 = randomQ % 2 == 0;
}
+ randomQ = usePriorityMode ? 0 : randomQ; // reset randomQ
long popTime = System.currentTimeMillis();
CompletableFuture<Long> getMessageFuture =
CompletableFuture.completedFuture(0L);
if (needRetry && !requestHeader.isOrder()) {
@@ -653,7 +658,9 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
StringBuilder msgOffsetInfo, StringBuilder orderCountInfo, int
randomQ, CompletableFuture<Long> getMessageFuture) {
if (topicConfig != null) {
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
- int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
+ int index =
(brokerController.getBrokerConfig().isPriorityOrderAsc() ?
+ topicConfig.getReadQueueNums() - 1 - i : i) + randomQ;
+ int queueId = index % topicConfig.getReadQueueNums();
getMessageFuture = getMessageFuture.thenCompose(restNum ->
popMsgFromQueue(topicConfig.getTopicName(),
requestHeader.getAttemptId(), isRetry,
getMessageResult, requestHeader, queueId, restNum,
reviveQid, channel, popTime, messageFilter,
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 434812883e..e88879d9c6 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -112,7 +112,6 @@ public class PopReviveService extends ServiceThread {
msgInner.setTopic(popCheckPoint.getTopic());
}
msgInner.setBody(messageExt.getBody());
- msgInner.setQueueId(0);
if (messageExt.getTags() != null) {
msgInner.setTags(messageExt.getTags());
} else {
@@ -131,6 +130,7 @@ public class PopReviveService extends ServiceThread {
msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP,
popCheckPoint.getCId());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
+ msgInner.setQueueId(getRetryQueueId(msgInner.getTopic(), messageExt));
PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveRetryMessageCount(popCheckPoint,
putMessageResult.getPutMessageStatus());
if (brokerController.getBrokerConfig().isEnablePopLog()) {
@@ -150,30 +150,55 @@ public class PopReviveService extends ServiceThread {
return true;
}
- private void initPopRetryOffset(String topic, String consumerGroup) {
- long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup,
topic, 0);
- if (offset < 0) {
-
this.brokerController.getConsumerOffsetManager().commitOffset("initPopRetryOffset",
consumerGroup, topic,
- 0, 0);
+ private void initPopRetryOffset(String retryTopic, String consumerGroup,
int retryQueueNum) {
+ for (int i = 0; i < retryQueueNum; i++) {
+ long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup,
retryTopic, i);
+ if (offset < 0) {
+
this.brokerController.getConsumerOffsetManager().commitOffset("initPopRetryOffset",
consumerGroup, retryTopic, i, 0);
+ }
}
}
- public void addRetryTopicIfNotExist(String topic, String consumerGroup) {
+ public void addRetryTopicIfNotExist(String retryTopic, String
consumerGroup) {
if (brokerController != null) {
- TopicConfig topicConfig =
brokerController.getTopicConfigManager().selectTopicConfig(topic);
- if (topicConfig != null) {
+ TopicConfig topicConfig =
brokerController.getTopicConfigManager().selectTopicConfig(retryTopic);
+ if (topicConfig != null &&
!brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
return;
}
- topicConfig = new TopicConfig(topic);
- topicConfig.setReadQueueNums(PopAckConstants.retryQueueNum);
- topicConfig.setWriteQueueNums(PopAckConstants.retryQueueNum);
+
+ int retryQueueNum = PopAckConstants.retryQueueNum;
+ if (brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
+ String normalTopic = KeyBuilder.parseNormalTopic(retryTopic,
consumerGroup);
+ TopicConfig normalConfig =
brokerController.getTopicConfigManager().selectTopicConfig(normalTopic); //
always exists
+ retryQueueNum = normalConfig.getWriteQueueNums();
+ if (topicConfig != null && topicConfig.getWriteQueueNums() ==
normalConfig.getWriteQueueNums()) {
+ return;
+ }
+ }
+
+ // create new one, or update in case of queue expansion
+ topicConfig = new TopicConfig(retryTopic);
+ topicConfig.setReadQueueNums(retryQueueNum);
+ topicConfig.setWriteQueueNums(retryQueueNum);
topicConfig.setTopicFilterType(TopicFilterType.SINGLE_TAG);
topicConfig.setPerm(6);
topicConfig.setTopicSysFlag(0);
brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
- initPopRetryOffset(topic, consumerGroup);
+ initPopRetryOffset(retryTopic, consumerGroup, retryQueueNum);
+ }
+ }
+
+ private int getRetryQueueId(String retryTopic, MessageExt messageExt) {
+ if (!brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
+ return 0;
+ }
+ int oriQueueId = messageExt.getQueueId(); // original qid of normal or
retry topic
+ if (oriQueueId >
brokerController.getTopicConfigManager().selectTopicConfig(retryTopic).getWriteQueueNums()
- 1) {
+ POP_LOGGER.warn("not expected, {}, {}, {}", retryTopic,
oriQueueId, messageExt.getMsgId());
+ return 0; // fallback
}
+ return oriQueueId;
}
protected List<MessageExt> getReviveMessage(long offset, int queueId) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index eefdb85ccf..c8e7e4c128 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -281,6 +281,16 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
}
MessageAccessor.setProperties(msgInner, oriProps);
+ // check properties to ensure exclusive, don't check topic meta config
to keep the behavior consistent
+ int msgPriority = msgInner.getPriority();
+ if (msgPriority >= 0) {
+ if
(TopicMessageType.PRIORITY.equals(TopicMessageType.parseFromMessageProperty(msgInner.getProperties())))
{
+ queueIdInt = Math.min(msgPriority,
topicConfig.getWriteQueueNums() - 1);
+ msgInner.setQueueId(queueIdInt);
+ } else {
+ MessageAccessor.clearProperty(msgInner,
MessageConst.PROPERTY_PRIORITY);
+ }
+ }
CleanupPolicy cleanupPolicy =
CleanupPolicyUtils.getDeletePolicy(Optional.of(topicConfig));
if (Objects.equals(cleanupPolicy, CleanupPolicy.COMPACTION)) {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
index db5f60fb17..dfa626c885 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
@@ -37,6 +37,7 @@ import
org.apache.rocketmq.broker.longpolling.PopLongPollingService;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
@@ -93,6 +94,7 @@ public class PopConsumerServiceTest {
messageStoreConfig.setStorePathRootDir(filePath);
TopicConfigManager topicConfigManager =
Mockito.mock(TopicConfigManager.class);
+ SubscriptionGroupManager subscriptionGroupManager =
Mockito.mock(SubscriptionGroupManager.class);
ConsumerOffsetManager consumerOffsetManager =
Mockito.mock(ConsumerOffsetManager.class);
PopMessageProcessor popMessageProcessor =
Mockito.mock(PopMessageProcessor.class);
PopLongPollingService popLongPollingService =
Mockito.mock(PopLongPollingService.class);
@@ -101,6 +103,7 @@ public class PopConsumerServiceTest {
brokerController = Mockito.mock(BrokerController.class);
Mockito.when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
Mockito.when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+
Mockito.when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
Mockito.when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
Mockito.when(brokerController.getPopMessageProcessor()).thenReturn(popMessageProcessor);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index a46435543a..5142ed12be 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -240,13 +240,18 @@ public class BrokerConfig extends BrokerIdentity {
private boolean retrieveMessageFromPopRetryTopicV1 = true;
private boolean enableRetryTopicV2 = false;
private int popFromRetryProbability = 20;
+ // pop retry probability for priority mode
+ private int popFromRetryProbabilityForPriority = 0;
+ // 0 as the lowest priority if true
+ private boolean priorityOrderAsc = true;
private boolean popConsumerFSServiceInit = true;
private boolean popConsumerKVServiceLog = false;
private boolean popConsumerKVServiceInit = false;
private boolean popConsumerKVServiceEnable = false;
private int popReviveMaxReturnSizePerRead = 16 * 1024;
private int popReviveMaxAttemptTimes = 16;
-
+ // each message queue will have a corresponding retry queue
+ private boolean useSeparateRetryQueue = false;
private boolean realTimeNotifyConsumerChange = true;
private boolean litePullMessageEnable = true;
@@ -2177,4 +2182,28 @@ public class BrokerConfig extends BrokerIdentity {
public void setSplitMetadataSize(int splitMetadataSize) {
this.splitMetadataSize = splitMetadataSize;
}
+
+ public int getPopFromRetryProbabilityForPriority() {
+ return popFromRetryProbabilityForPriority;
+ }
+
+ public void setPopFromRetryProbabilityForPriority(int
popFromRetryProbabilityForPriority) {
+ this.popFromRetryProbabilityForPriority =
popFromRetryProbabilityForPriority;
+ }
+
+ public boolean isPriorityOrderAsc() {
+ return priorityOrderAsc;
+ }
+
+ public void setPriorityOrderAsc(boolean priorityOrderAsc) {
+ this.priorityOrderAsc = priorityOrderAsc;
+ }
+
+ public boolean isUseSeparateRetryQueue() {
+ return useSeparateRetryQueue;
+ }
+
+ public void setUseSeparateRetryQueue(boolean useSeparateRetryQueue) {
+ this.useSeparateRetryQueue = useSeparateRetryQueue;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
b/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
index 5b0072401c..845f407939 100644
---
a/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
+++
b/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
@@ -19,11 +19,20 @@ package org.apache.rocketmq.common;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.common.attribute.Attribute;
+import org.apache.rocketmq.common.attribute.LongRangeAttribute;
public class SubscriptionGroupAttributes {
public static final Map<String, Attribute> ALL;
+ public static final LongRangeAttribute PRIORITY_FACTOR_ATTRIBUTE = new
LongRangeAttribute(
+ "priority.factor",
+ true,
+ 0, // disable priority mode
+ 100, // enable priority mode
+ 100
+ );
static {
ALL = new HashMap<>();
+ ALL.put(PRIORITY_FACTOR_ATTRIBUTE.getName(),
PRIORITY_FACTOR_ATTRIBUTE);
}
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
index 5e581a34ee..9a89d30e8f 100644
---
a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
+++
b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
@@ -28,6 +28,7 @@ public enum TopicMessageType {
FIFO("FIFO"),
DELAY("DELAY"),
TRANSACTION("TRANSACTION"),
+ PRIORITY("PRIORITY"),
MIXED("MIXED");
private final String value;
@@ -36,7 +37,8 @@ public enum TopicMessageType {
}
public static Set<String> topicMessageTypeSet() {
- return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value,
DELAY.value, TRANSACTION.value, MIXED.value);
+ return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value,
DELAY.value, TRANSACTION.value,
+ PRIORITY.value, MIXED.value);
}
public String getValue() {
@@ -44,9 +46,8 @@ public enum TopicMessageType {
}
public static TopicMessageType parseFromMessageProperty(Map<String,
String> messageProperty) {
- String isTrans =
messageProperty.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- String isTransValue = "true";
- if (isTransValue.equals(isTrans)) {
+ // the parse order keeps message types mutually exclusive
+ if
(Boolean.parseBoolean(messageProperty.get(MessageConst.PROPERTY_TRANSACTION_PREPARED)))
{
return TopicMessageType.TRANSACTION;
} else if (messageProperty.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL)
!= null
|| messageProperty.get(MessageConst.PROPERTY_TIMER_DELIVER_MS) !=
null
@@ -55,6 +56,8 @@ public enum TopicMessageType {
return TopicMessageType.DELAY;
} else if (messageProperty.get(MessageConst.PROPERTY_SHARDING_KEY) !=
null) {
return TopicMessageType.FIFO;
+ } else if (messageProperty.get(MessageConst.PROPERTY_PRIORITY) !=
null) {
+ return TopicMessageType.PRIORITY;
}
return TopicMessageType.NORMAL;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/Message.java
b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
index acd4df96d2..b64f3520c1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.common.message;
+import org.apache.commons.lang3.math.NumberUtils;
+
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
@@ -154,6 +156,17 @@ public class Message implements Serializable {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL,
String.valueOf(level));
}
+ public void setPriority(int priority) {
+ if (priority < 0) {
+ throw new IllegalArgumentException("The priority must be greater
than or equal to 0");
+ }
+ this.putProperty(MessageConst.PROPERTY_PRIORITY,
String.valueOf(priority));
+ }
+
+ public int getPriority() {
+ return
NumberUtils.toInt(this.getProperty(MessageConst.PROPERTY_PRIORITY), -1);
+ }
+
public boolean isWaitStoreMsgOK() {
String result =
this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
if (null == result) {
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index 81f132d134..72078f761d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -44,6 +44,7 @@ public class MessageConst {
public static final String PROPERTY_EXTEND_UNIQ_INFO = "EXTEND_UNIQ_INFO";
public static final String PROPERTY_MAX_RECONSUME_TIMES =
"MAX_RECONSUME_TIMES";
public static final String PROPERTY_CONSUME_START_TIMESTAMP =
"CONSUME_START_TIME";
+ public static final String PROPERTY_PRIORITY = "_SYS_MSG_PRIORITY_";
public static final String PROPERTY_INNER_NUM = "INNER_NUM";
public static final String PROPERTY_INNER_BASE = "INNER_BASE";
public static final String DUP_INFO = "DUP_INFO";
@@ -169,5 +170,6 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_TOPIC);
STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
STRING_HASH_SET.add(PROPERTY_CRC32);
+ STRING_HASH_SET.add(PROPERTY_PRIORITY);
}
}
diff --git
a/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java
b/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java
index 0321679ccc..79402ca1b2 100644
---
a/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java
+++
b/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java
@@ -33,6 +33,7 @@ public class TopicMessageTypeTest {
private Map<String, String> transactionMessageProperty;
private Map<String, String> delayMessageProperty;
private Map<String, String> fifoMessageProperty;
+ private Map<String, String> priorityMessageProperty;
@Before
public void setUp() {
@@ -40,15 +41,18 @@ public class TopicMessageTypeTest {
transactionMessageProperty = new HashMap<>();
delayMessageProperty = new HashMap<>();
fifoMessageProperty = new HashMap<>();
+ priorityMessageProperty = new HashMap<>();
transactionMessageProperty.put(MessageConst.PROPERTY_TRANSACTION_PREPARED,
"true");
delayMessageProperty.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "1");
fifoMessageProperty.put(MessageConst.PROPERTY_SHARDING_KEY,
"shardingKey");
+ priorityMessageProperty.put(MessageConst.PROPERTY_PRIORITY, "3");
}
@Test
public void testTopicMessageTypeSet() {
- Set<String> expectedSet = Sets.newHashSet("UNSPECIFIED", "NORMAL",
"FIFO", "DELAY", "TRANSACTION", "MIXED");
+ Set<String> expectedSet
+ = Sets.newHashSet("UNSPECIFIED", "NORMAL", "FIFO", "DELAY",
"TRANSACTION", "PRIORITY", "MIXED");
Set<String> actualSet = TopicMessageType.topicMessageTypeSet();
assertEquals(expectedSet, actualSet);
}
@@ -77,6 +81,12 @@ public class TopicMessageTypeTest {
assertEquals(TopicMessageType.FIFO, actual);
}
+ @Test
+ public void testParseFromMessageProperty_Priority() {
+ TopicMessageType actual =
TopicMessageType.parseFromMessageProperty(priorityMessageProperty);
+ assertEquals(TopicMessageType.PRIORITY, actual);
+ }
+
@Test
public void testGetMetricsValue() {
for (TopicMessageType type : TopicMessageType.values()) {
@@ -116,6 +126,13 @@ public class TopicMessageTypeTest {
properties.put(MessageConst.PROPERTY_SHARDING_KEY, "sharding_key");
Assert.assertEquals(TopicMessageType.FIFO,
TopicMessageType.parseFromMessageProperty(properties));
+ // PRIORITY
+ properties.clear();
+ properties.put(MessageConst.PROPERTY_PRIORITY, "3");
+ Assert.assertEquals(TopicMessageType.PRIORITY,
TopicMessageType.parseFromMessageProperty(properties));
+ properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3");
+ Assert.assertEquals(TopicMessageType.DELAY,
TopicMessageType.parseFromMessageProperty(properties));
+
// NORMAL
properties.clear();
Assert.assertEquals(TopicMessageType.NORMAL,
TopicMessageType.parseFromMessageProperty(properties));
diff --git a/pom.xml b/pom.xml
index 4343c3a574..088dda8b77 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,7 +127,7 @@
<annotations-api.version>6.0.53</annotations-api.version>
<extra-enforcer-rules.version>1.0-beta-4</extra-enforcer-rules.version>
<concurrentlinkedhashmap-lru.version>1.4.2</concurrentlinkedhashmap-lru.version>
- <rocketmq-proto.version>2.0.4</rocketmq-proto.version>
+ <rocketmq-proto.version>2.1.1</rocketmq-proto.version>
<grpc.version>1.53.0</grpc.version>
<protobuf.version>3.20.1</protobuf.version>
<disruptor.version>1.2.10</disruptor.version>
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
index 33a4e1312f..4ce3dc831d 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
@@ -212,6 +212,12 @@ public class GrpcConverter {
}
}
+ // priority
+ int priority = messageExt.getPriority();
+ if (priority >= 0) {
+ systemPropertiesBuilder.setPriority(priority);
+ }
+
// sharding key
String shardingKey =
messageExt.getProperty(MessageConst.PROPERTY_SHARDING_KEY);
if (shardingKey != null) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
index f7b8014bb9..2c3ffd1305 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
@@ -262,6 +262,12 @@ public class SendMessageActivity extends
AbstractMessingActivity {
// set delay level or deliver timestamp
fillDelayMessageProperty(message, messageWithHeader);
+ // set priority
+ if (message.getSystemProperties().hasPriority()) {
+ int priority = message.getSystemProperties().getPriority();
+ messageWithHeader.setPriority(priority);
+ }
+
// set reconsume times
int reconsumeTimes =
message.getSystemProperties().getDeliveryAttempt();
MessageAccessor.setReconsumeTime(messageWithHeader,
String.valueOf(reconsumeTimes));
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index a4e79a856f..76f86b436d 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -306,6 +306,8 @@ public class RouteActivity extends AbstractMessingActivity {
return Collections.singletonList(MessageType.TRANSACTION);
case DELAY:
return Collections.singletonList(MessageType.DELAY);
+ case PRIORITY:
+ return Collections.singletonList(MessageType.PRIORITY);
case MIXED:
return Arrays.asList(MessageType.NORMAL, MessageType.FIFO,
MessageType.DELAY, MessageType.TRANSACTION);
default:
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
index 4882a5ed8b..a64867ddfe 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
@@ -268,6 +268,34 @@ public class SendMessageActivityTest extends
BaseActivityTest {
assertEquals(MessageSysFlag.TRANSACTION_PREPARED_TYPE |
MessageSysFlag.COMPRESSED_FLAG, sendMessageActivity.buildSysFlag(message));
}
+ @Test
+ public void testPriorityMessage() {
+ String msgId = MessageClientIDSetter.createUniqID();
+ Message message = Message.newBuilder()
+ .setTopic(Resource.newBuilder()
+ .setName(TOPIC)
+ .build())
+ .setSystemProperties(SystemProperties.newBuilder()
+ .setMessageId(msgId)
+ .setQueueId(0)
+ .setMessageType(MessageType.PRIORITY)
+ .setPriority(5)
+ .setBodyEncoding(Encoding.GZIP)
+
.setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis()))
+
.setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(),
"127.0.0.1:1234"))
+ .build())
+ .setBody(ByteString.copyFromUtf8("123"))
+ .build();
+ org.apache.rocketmq.common.message.Message messageExt =
this.sendMessageActivity.buildMessage(null,
+ Lists.newArrayList(
+ message
+ ),
+ Resource.newBuilder().setName(TOPIC).build()).get(0);
+
+ assertEquals(MessageClientIDSetter.getUniqID(messageExt), msgId);
+ assertEquals(5, messageExt.getPriority());
+ }
+
@Test
public void testSendOrderMessageQueueSelector() throws Exception {
TopicRouteData topicRouteData = new TopicRouteData();
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
index c9c2a8090c..2c3738a464 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
@@ -17,13 +17,17 @@
package org.apache.rocketmq.remoting.protocol.subscription;
+import com.alibaba.fastjson2.annotation.JSONField;
import com.google.common.base.MoreObjects;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.rocketmq.common.MixAll;
+import static
org.apache.rocketmq.common.SubscriptionGroupAttributes.PRIORITY_FACTOR_ATTRIBUTE;
+
public class SubscriptionGroupConfig {
private String groupName;
@@ -173,6 +177,12 @@ public class SubscriptionGroupConfig {
this.attributes = attributes;
}
+ @JSONField(serialize = false, deserialize = false)
+ public long getPriorityFactor() {
+ String factorStr = null == attributes ? null :
attributes.get(PRIORITY_FACTOR_ATTRIBUTE.getName());
+ return NumberUtils.toLong(factorStr,
PRIORITY_FACTOR_ATTRIBUTE.getDefaultValue());
+ }
+
@Override
public int hashCode() {
final int prime = 31;
diff --git
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
index 7df189a915..75444d3a1f 100644
---
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
+++
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
@@ -110,6 +110,7 @@ public class RMQNormalProducer extends AbstractMQProducer {
msgBodys.addData(new String(message.getBody(),
StandardCharsets.UTF_8));
originMsgs.addData(msg);
originMsgIndex.put(new String(message.getBody(),
StandardCharsets.UTF_8), internalSendResult);
+ sendResult.setSendResultObj(internalSendResult);
} catch (Exception e) {
if (isDebug) {
e.printStackTrace();
diff --git
a/test/src/main/java/org/apache/rocketmq/test/sendresult/ResultWrapper.java
b/test/src/main/java/org/apache/rocketmq/test/sendresult/ResultWrapper.java
index 9fe31463e4..d9a5987ff4 100644
--- a/test/src/main/java/org/apache/rocketmq/test/sendresult/ResultWrapper.java
+++ b/test/src/main/java/org/apache/rocketmq/test/sendresult/ResultWrapper.java
@@ -17,11 +17,14 @@
package org.apache.rocketmq.test.sendresult;
+import org.apache.rocketmq.client.producer.SendResult;
+
public class ResultWrapper {
private boolean sendResult = false;
private String msgId = null;
private Exception sendException = null;
private String brokerIp = null;
+ private SendResult sendResultObj = null;
public String getBrokerIp() {
return brokerIp;
@@ -55,6 +58,13 @@ public class ResultWrapper {
this.sendException = sendException;
}
+ public SendResult getSendResultObj() {
+ return sendResultObj;
+ }
+ public void setSendResultObj(SendResult sendResultObj) {
+ this.sendResultObj = sendResultObj;
+ }
+
@Override
public String toString() {
return String.format("sendstatus:%s msgId:%s", sendResult, msgId);
diff --git
a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 276d08d806..3b6154ae6b 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -104,12 +104,10 @@ public class MQAdminTestUtils {
return createResult;
}
- public static boolean createSub(String nameSrvAddr, String clusterName,
String consumerId) {
+ public static boolean createSub(String nameSrvAddr, String clusterName,
SubscriptionGroupConfig config) {
boolean createResult = true;
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.setNamesrvAddr(nameSrvAddr);
- SubscriptionGroupConfig config = new SubscriptionGroupConfig();
- config.setGroupName(consumerId);
try {
mqAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(mqAdminExt,
@@ -117,7 +115,7 @@ public class MQAdminTestUtils {
for (String addr : masterSet) {
try {
mqAdminExt.createAndUpdateSubscriptionGroupConfig(addr,
config);
- log.info("create subscription group {} to {} success.",
consumerId, addr);
+ log.info("create subscription group {} to {} success.",
config.getGroupName(), addr);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000 * 1);
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 472e106ce3..50741ba091 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -203,15 +204,21 @@ public class BaseConf {
}
public static String initConsumerGroup() {
- String group = MQRandomUtils.getRandomConsumerGroup();
- return initConsumerGroup(group);
+ return initConsumerGroup(MQRandomUtils.getRandomConsumerGroup());
}
public static String initConsumerGroup(String group) {
- MQAdminTestUtils.createSub(NAMESRV_ADDR, CLUSTER_NAME, group);
+ SubscriptionGroupConfig config = new SubscriptionGroupConfig();
+ config.setGroupName(group);
+ MQAdminTestUtils.createSub(NAMESRV_ADDR, CLUSTER_NAME, config);
return group;
}
+ public static String initConsumerGroup(SubscriptionGroupConfig config) {
+ MQAdminTestUtils.createSub(NAMESRV_ADDR, CLUSTER_NAME, config);
+ return config.getGroupName();
+ }
+
public static DefaultMQAdminExt getAdmin(String nsAddr) {
final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(3 * 1000);
mqAdminExt.setNamesrvAddr(nsAddr);
diff --git
a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 287e54d561..cfcb989649 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -141,6 +141,7 @@ public class IntegrationTestBase {
brokerConfig.setRecallMessageEnable(true);
storeConfig.setEnableConsumeQueueExt(true);
brokerConfig.setLoadBalancePollNameServerInterval(500);
+ brokerConfig.setPopConsumerKVServiceInit(true);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setMappedFileSizeCommitLog(commitLogSize);
diff --git
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
index 2e29b95a5a..d4a1b3be5a 100644
---
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
+++
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
@@ -18,11 +18,15 @@
package org.apache.rocketmq.test.client.consumer.pop;
import java.util.concurrent.CompletableFuture;
+
+import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -69,4 +73,8 @@ public class BasePopNormally extends BasePop {
brokerAddr, messageQueue, invisibleTime, maxNums, group, 3000,
false,
ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
}
+
+ protected CompletableFuture<AckResult> ackMessageAsync(MessageExt
messageExt) {
+ return client.ackMessageAsync(brokerAddr, topic, group,
messageExt.getProperty(MessageConst.PROPERTY_POP_CK));
+ }
}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopPriorityIT.java
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopPriorityIT.java
new file mode 100644
index 0000000000..98f7ae55bd
--- /dev/null
+++
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopPriorityIT.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.pop;
+
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.attribute.AttributeParser;
+import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.test.base.IntegrationTestBase;
+import org.apache.rocketmq.test.util.TestUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.rocketmq.common.SubscriptionGroupAttributes.PRIORITY_FACTOR_ATTRIBUTE;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class PopPriorityIT extends BasePopNormally {
+
+ private final boolean popConsumerKVServiceEnable;
+ private final boolean priorityOrderAsc;
+ private int writeQueueNum = 8;
+
+ public PopPriorityIT(boolean popConsumerKVServiceEnable, boolean
priorityOrderAsc) {
+ this.popConsumerKVServiceEnable = popConsumerKVServiceEnable;
+ this.priorityOrderAsc = priorityOrderAsc;
+ }
+
+ @Parameterized.Parameters
+ public static List<Object[]> params() {
+ List<Object[]> result = new ArrayList<>();
+ result.add(new Object[] {false, true});
+ result.add(new Object[] {false, false});
+ result.add(new Object[] {true, true});
+ result.add(new Object[] {true, false});
+ return result;
+ }
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ // reset default config if changed
+ writeQueueNum = 8;
+
brokerController1.getBrokerConfig().setPopFromRetryProbabilityForPriority(0);
+ brokerController1.getBrokerConfig().setUseSeparateRetryQueue(false);
+
brokerController1.getBrokerConfig().setPopConsumerKVServiceEnable(popConsumerKVServiceEnable);
+
brokerController1.getBrokerConfig().setPriorityOrderAsc(priorityOrderAsc);
+ IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME,
writeQueueNum, CQType.SimpleCQ, TopicMessageType.PRIORITY);
+ }
+
+ @After
+ public void tearDown() {
+ super.tearDown();
+ }
+
+ @Test
+ public void test_normal_send() {
+ int priority = -1; // normal message
+ Set<Integer> queueIdSet = new HashSet<>();
+ for (int i = 0; i < 32; i++) {
+ Message message = mockMessage(topic, priority, "");
+ SendResult sendResult = producer.send(message,
null).getSendResultObj();
+ queueIdSet.add(sendResult.getMessageQueue().getQueueId());
+ }
+ assertTrue(queueIdSet.size() > 1);
+ }
+
+ @Test
+ public void test_priority_send() {
+ final int priority = 0; // priority message
+ for (int i = 0; i < 32; i++) {
+ Message message = mockMessage(topic, priority, "");
+ SendResult sendResult = producer.send(message,
null).getSendResultObj();
+ assertEquals(priority, sendResult.getMessageQueue().getQueueId());
+ }
+ }
+
+ @Test
+ public void test_priority_consume_always_high_priority() throws Exception {
+ int msgNumPerQueue = 20;
+ final int maxPriority = priorityOrderAsc ? writeQueueNum - 1 : 0;
+ for (int i = 0; i < writeQueueNum; i++) {
+ Message message = mockMessage(topic, i, String.valueOf(i));
+ for (int j = 0; j < msgNumPerQueue; j++) {
+ producer.send(message);
+ }
+ }
+ Assert.assertTrue(awaitDispatchMs(2000));
+ for (int i = 0; i < msgNumPerQueue; i++) {
+ PopResult popResult =
popMessageAsync(Duration.ofSeconds(600).toMillis(), 1, 30000).get();
+ TestUtil.waitForMonment(20); // wait lock release
+ assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+ MessageExt message = popResult.getMsgFoundList().get(0);
+ assertEquals(maxPriority, message.getPriority()); // not a
coincidence
+ }
+ }
+
+ @Test
+ public void test_priority_consume_from_high_to_low() throws Exception {
+ for (int i = 0; i < writeQueueNum; i++) {
+ Message message = mockMessage(topic, i, String.valueOf(i));
+ producer.send(message);
+ }
+ Assert.assertTrue(awaitDispatchMs(2000));
+ for (int i = 0; i < writeQueueNum; i++) {
+ PopResult popResult =
popMessageAsync(Duration.ofSeconds(30).toMillis(), 1, 30000).get();
+ assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+ MessageExt message = popResult.getMsgFoundList().get(0);
+ int expectPriority = priorityOrderAsc ? writeQueueNum - 1 - i : i;
+ assertEquals(0, message.getQueueOffset());
+ assertEquals(expectPriority, message.getQueueId());
+ assertEquals(expectPriority, message.getPriority());
+ }
+ }
+
+ @Test
+ public void test_priority_consume_disable() throws Exception {
+ SubscriptionGroupConfig config = new SubscriptionGroupConfig();
+ config.setGroupName(group);
+ config.setAttributes(AttributeParser.parseToMap("+" +
PRIORITY_FACTOR_ATTRIBUTE.getName() + "=0"));
+ initConsumerGroup(config);
+
+ int msgNumPerQueue = 200;
+ for (int i = 0; i < writeQueueNum; i++) {
+ Message message = mockMessage(topic, i, String.valueOf(i));
+ for (int j = 0; j < msgNumPerQueue; j++) {
+ producer.send(message);
+ }
+ }
+ Assert.assertTrue(awaitDispatchMs(2000));
+ int sampleCount = 800;
+ int[] queueIdCount = new int[writeQueueNum];
+ for (int i = 0; i < sampleCount; i++) {
+ PopResult popResult =
popMessageAsync(Duration.ofSeconds(600).toMillis(), 1, 30000).get();
+ TestUtil.waitForMonment(10); // wait lock release
+ assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+ MessageExt message = popResult.getMsgFoundList().get(0);
+ queueIdCount[message.getQueueId()] =
queueIdCount[message.getQueueId()] + 1;
+ }
+
+ double expectAverage = (double) sampleCount / writeQueueNum;
+ for (int count : queueIdCount) {
+ assertTrue(Math.abs(count - expectAverage) < expectAverage * 0.4);
+ }
+ }
+
+ @Test
+ public void test_priority_consume_retry_as_lowest() throws Exception {
+ // retry as lowest by default
+ int count = 100;
+ for (int i = 0; i < count; i++) {
+ Message message = mockMessage(topic, new
Random().nextInt(writeQueueNum), String.valueOf(i));
+ producer.send(message);
+ }
+ int invisibleTime = 3;
+ PopResult popResult =
popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), 1, 30000).get();
+ assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+ String retryId = popResult.getMsgFoundList().get(0).getMsgId();
+ TestUtil.waitForSeconds(invisibleTime + 3);
+ Assert.assertTrue(awaitDispatchMs(2000));
+
+ List<MessageExt> collect = new ArrayList<>();
+ await()
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(35, TimeUnit.SECONDS)
+ .until(() -> {
+ PopResult result =
popMessageAsync(Duration.ofSeconds(600).toMillis(), 32, 5000).get();
+ if (PopStatus.FOUND.equals(result.getPopStatus())) {
+ collect.addAll(result.getMsgFoundList());
+ return false;
+ }
+ return true;
+ });
+
+ assertEquals(count, collect.size());
+ assertEquals(1, collect.get(collect.size() - 1).getReconsumeTimes());
+ assertEquals(retryId, collect.get(collect.size() - 1).getMsgId());
+ }
+
+ @Test
+ public void test_priority_consume_retry_as_highest() throws Exception {
+
brokerController1.getBrokerConfig().setPopFromRetryProbabilityForPriority(100);
+ int count = 100;
+ for (int i = 0; i < count; i++) {
+ Message message = mockMessage(topic, new
Random().nextInt(writeQueueNum), String.valueOf(i));
+ producer.send(message);
+ }
+ int invisibleTime = 3;
+ PopResult popResult =
popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), 1, 30000).get();
+ assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+ String retryId = popResult.getMsgFoundList().get(0).getMsgId();
+ TestUtil.waitForSeconds(invisibleTime + 3);
+ Assert.assertTrue(awaitDispatchMs(2000));
+
+ List<MessageExt> collect = new ArrayList<>();
+ await()
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(35, TimeUnit.SECONDS)
+ .until(() -> {
+ PopResult result =
popMessageAsync(Duration.ofSeconds(600).toMillis(), 32, 5000).get();
+ if (PopStatus.FOUND.equals(result.getPopStatus())) {
+ collect.addAll(result.getMsgFoundList());
+ return false;
+ }
+ return true;
+ });
+
+ assertEquals(count, collect.size());
+ assertEquals(1, collect.get(0).getReconsumeTimes());
+ assertEquals(retryId, collect.get(0).getMsgId());
+ }
+
+ @Test
+ public void test_priority_consume_use_separate_retry_queue() throws
Exception {
+ brokerController1.getBrokerConfig().setUseSeparateRetryQueue(true);
+
brokerController1.getBrokerConfig().setPopFromRetryProbabilityForPriority(100);
+ for (int i = 0; i < writeQueueNum; i++) {
+ Message message = mockMessage(topic, i, String.valueOf(i));
+ producer.send(message);
+ }
+ Assert.assertTrue(awaitDispatchMs(2000));
+ int invisibleTime = 3;
+ PopResult popResult =
popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), writeQueueNum,
30000).get();
+ assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+ assertEquals(writeQueueNum, popResult.getMsgFoundList().size());
+ TestUtil.waitForSeconds(invisibleTime + 3);
+
+ popResult = popMessageAsync(Duration.ofSeconds(600).toMillis(), 32,
10000).get();
+ assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+ assertEquals(writeQueueNum, popResult.getMsgFoundList().size());
+ for (int i = 0; i < writeQueueNum; i++) {
+ MessageExt message = popResult.getMsgFoundList().get(i);
+ assertEquals(0, message.getQueueOffset()); // means a separate
retry queue
+ assertEquals(1, message.getReconsumeTimes());
+ int expectPriority = priorityOrderAsc ? writeQueueNum - 1 - i : i;
+ assertEquals(expectPriority, message.getQueueId());
+ assertEquals(expectPriority, message.getPriority());
+ }
+ }
+
+ @Test
+ public void
test_priority_consume_use_separate_retry_queue_with_queue_expansion() throws
Exception {
+ // retry as lowest by default
+ brokerController1.getBrokerConfig().setUseSeparateRetryQueue(true);
+ for (int i = 0; i < writeQueueNum; i++) {
+ Message message = mockMessage(topic, i, String.valueOf(i));
+ producer.send(message);
+ }
+ Assert.assertTrue(awaitDispatchMs(2000));
+ int invisibleTime = 3;
+ PopResult popResult =
popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), writeQueueNum,
30000).get();
+ assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+ assertEquals(writeQueueNum, popResult.getMsgFoundList().size());
+ TestUtil.waitForSeconds(invisibleTime + 3); // wait retry created
+
+ writeQueueNum = writeQueueNum * 2;
+ IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME,
writeQueueNum, CQType.SimpleCQ, TopicMessageType.PRIORITY);
+ for (int i = writeQueueNum / 2; i < writeQueueNum; i++) {
+ Message message = mockMessage(topic, i, String.valueOf(i));
+ producer.send(message);
+ }
+ Assert.assertTrue(awaitDispatchMs(2000));
+
+ popResult =
popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), 32, 5000).get();
+ List<MessageExt> msgList = popResult.getMsgFoundList();
+ // asc == true, collect: [15 -> 8, 7 -> 0]
+ // asc == false, collect: [8 -> 15, 0 -> 7]
+ assertEquals(writeQueueNum, msgList.size());
+ assertEquals(priorityOrderAsc ? writeQueueNum - 1 : writeQueueNum / 2,
msgList.get(0).getQueueId());
+ assertEquals(priorityOrderAsc ? writeQueueNum - 1 : writeQueueNum / 2,
msgList.get(0).getPriority());
+ assertEquals(priorityOrderAsc ? 0 : writeQueueNum / 2 - 1,
msgList.get(msgList.size() - 1).getQueueId());
+ assertEquals(priorityOrderAsc ? 0 : writeQueueNum / 2 - 1,
msgList.get(msgList.size() - 1).getPriority());
+ assertEquals(1, msgList.get(msgList.size() - 1).getReconsumeTimes());
+ assertEquals(0, msgList.get(msgList.size() - 1).getQueueOffset()); //
means a separate retry queue
+ }
+
+ private static Message mockMessage(String topic, int priority, String key)
{
+ Message msg = new Message(topic, "HW".getBytes());
+ if (priority >= 0) {
+ msg.setPriority(priority);
+ }
+ msg.setKeys(key);
+ return msg;
+ }
+}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
index b9798cfd5a..b2092db96a 100644
---
a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
+++
b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
@@ -63,6 +63,7 @@ public class OffsetResetForPopIT extends BaseConf {
public void setUp() throws Exception {
// reset pop offset rely on server side offset
brokerController1.getBrokerConfig().setUseServerSideResetOffset(true);
+
brokerController1.getBrokerConfig().setPopConsumerKVServiceEnable(false); //
force disable before fifo resetOffset issue fixed
adminExt = BaseConf.getAdmin(NAMESRV_ADDR);
adminExt.start();