This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new bdae09ddf2 [RIP-80] #9928 Implementation of Priority Message (#9929)
bdae09ddf2 is described below

commit bdae09ddf22e63122c9ec82ad275450f0c915897
Author: imzs <[email protected]>
AuthorDate: Thu Dec 18 10:18:25 2025 +0800

    [RIP-80] #9928 Implementation of Priority Message (#9929)
    
    * #9928 Implementation of Priority Message
    
    * switch to fastjson2
    
    Change-Id: I3620b00b79a77a93a7cf0fdfac857fb495638ca6
    
    * fix bazel CI, upgrade rocketmq-proto to 2.1.1
    
    Change-Id: Ia66cc7b14b89dc319044ced5ba349315e487c849
    
    * Fix bazel CI
    
    * fix CI, temporarily disable popKv in OffsetResetForPopIT
    
    Change-Id: I0b406cf0ece5067de430c4404aaa1f2dd46edcba
    
    ---------
    
    Co-authored-by: RongtongJin <[email protected]>
---
 WORKSPACE                                          |   2 +-
 .../rocketmq/broker/pop/PopConsumerService.java    |  99 +++++--
 .../broker/processor/PopMessageProcessor.java      |  11 +-
 .../broker/processor/PopReviveService.java         |  51 +++-
 .../broker/processor/SendMessageProcessor.java     |  10 +
 .../broker/pop/PopConsumerServiceTest.java         |   3 +
 .../org/apache/rocketmq/common/BrokerConfig.java   |  31 +-
 .../common/SubscriptionGroupAttributes.java        |   9 +
 .../common/attribute/TopicMessageType.java         |  11 +-
 .../apache/rocketmq/common/message/Message.java    |  13 +
 .../rocketmq/common/message/MessageConst.java      |   2 +
 .../common/attribute/TopicMessageTypeTest.java     |  19 +-
 pom.xml                                            |   2 +-
 .../proxy/grpc/v2/common/GrpcConverter.java        |   6 +
 .../grpc/v2/producer/SendMessageActivity.java      |   6 +
 .../proxy/grpc/v2/route/RouteActivity.java         |   2 +
 .../grpc/v2/producer/SendMessageActivityTest.java  |  28 ++
 .../subscription/SubscriptionGroupConfig.java      |  10 +
 .../test/client/rmq/RMQNormalProducer.java         |   1 +
 .../rocketmq/test/sendresult/ResultWrapper.java    |  10 +
 .../rocketmq/test/util/MQAdminTestUtils.java       |   6 +-
 .../org/apache/rocketmq/test/base/BaseConf.java    |  13 +-
 .../rocketmq/test/base/IntegrationTestBase.java    |   1 +
 .../test/client/consumer/pop/BasePopNormally.java  |   8 +
 .../test/client/consumer/pop/PopPriorityIT.java    | 319 +++++++++++++++++++++
 .../rocketmq/test/offset/OffsetResetForPopIT.java  |   1 +
 26 files changed, 621 insertions(+), 53 deletions(-)

diff --git a/WORKSPACE b/WORKSPACE
index 0f06aa2112..9c532b55ef 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -71,7 +71,7 @@ maven_install(
         "org.bouncycastle:bcpkix-jdk15on:1.69",
         "com.google.code.gson:gson:2.8.9",
         
"com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2",
-        "org.apache.rocketmq:rocketmq-proto:2.0.4",
+        "org.apache.rocketmq:rocketmq-proto:2.1.1",
         "com.google.protobuf:protobuf-java:3.20.1",
         "com.google.protobuf:protobuf-java-util:3.20.1",
         "com.conversantmedia:disruptor:1.2.10",
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index 839c96e390..7476a6c206 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -25,9 +25,11 @@ import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.common.constant.ConsumeInitMode;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
@@ -38,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
 import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.store.AppendMessageStatus;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
@@ -324,6 +327,23 @@ public class PopConsumerService extends ServiceThread {
         });
     }
 
+    protected CompletableFuture<PopConsumerContext> 
getMessageFromTopicAsync(CompletableFuture<PopConsumerContext> future,
+        String clientHost, String groupId, String topicId, long requestCount, 
int batchSize, MessageFilter filter,
+        PopConsumerRecord.RetryType retryType) {
+        TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(topicId);
+        if (null == topicConfig) {
+            return future;
+        }
+        for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
+            long index = 
(brokerController.getBrokerConfig().isPriorityOrderAsc() ?
+                topicConfig.getReadQueueNums() - 1 - i : i) + requestCount;
+            int current = (int) index % topicConfig.getReadQueueNums();
+            future = this.getMessageAsync(future, clientHost, groupId,
+                topicId, current, batchSize, filter, retryType);
+        }
+        return future;
+    }
+
     public CompletableFuture<PopConsumerContext> popAsync(String clientHost, 
long popTime, long invisibleTime,
         String groupId, String topicId, int queueId, int batchSize, boolean 
fifo, String attemptId, int initMode,
         MessageFilter filter) {
@@ -336,6 +356,12 @@ public class PopConsumerService extends ServiceThread {
             return CompletableFuture.completedFuture(popConsumerContext);
         }
 
+        SubscriptionGroupConfig subscriptionGroupConfig =
+            
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupId);
+        if (null == subscriptionGroupConfig || 
!subscriptionGroupConfig.isConsumeEnable()) {
+            return CompletableFuture.completedFuture(popConsumerContext);
+        }
+
         log.debug("PopConsumerService popAsync, groupId={}, topicId={}, 
queueId={}, " +
                 "batchSize={}, invisibleTime={}, fifo={}, attemptId={}, 
filter={}",
             groupId, topicId, queueId, batchSize, invisibleTime, fifo, 
attemptId, filter);
@@ -345,7 +371,13 @@ public class PopConsumerService extends ServiceThread {
         String retryTopicV2 = KeyBuilder.buildPopRetryTopicV2(topicId, 
groupId);
         long requestCount = 
Objects.requireNonNull(ConcurrentHashMapUtils.computeIfAbsent(
             requestCountTable, requestKey, k -> new 
AtomicLong(0L))).getAndIncrement();
-        boolean preferRetry = requestCount % 5L == 0L;
+        boolean usePriorityMode = 
TopicMessageType.PRIORITY.equals(topicConfig.getTopicMessageType())
+            && !fifo && requestCount % 100L < 
subscriptionGroupConfig.getPriorityFactor();
+        int probability = usePriorityMode ?
+            brokerConfig.getPopFromRetryProbabilityForPriority() : 
brokerConfig.getPopFromRetryProbability();
+        probability = Math.max(0, Math.min(100, probability)); // [51, 100] 
means always
+        boolean preferRetry = probability > 0 && requestCount % (100 / 
probability) == 0L;
+        requestCount = usePriorityMode ? 0 : requestCount; // use requestCount 
as randomQ
 
         CompletableFuture<PopConsumerContext> getMessageFuture =
             CompletableFuture.completedFuture(popConsumerContext);
@@ -353,13 +385,13 @@ public class PopConsumerService extends ServiceThread {
         try {
             if (!fifo && preferRetry) {
                 if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
-                    getMessageFuture = this.getMessageAsync(getMessageFuture, 
clientHost, groupId,
-                        retryTopicV1, 0, batchSize, filter, 
PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
+                    getMessageFuture = 
this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
+                        retryTopicV1, requestCount, batchSize, filter, 
PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
                 }
 
                 if (brokerConfig.isEnableRetryTopicV2()) {
-                    getMessageFuture = this.getMessageAsync(getMessageFuture, 
clientHost, groupId,
-                        retryTopicV2, 0, batchSize, filter, 
PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
+                    getMessageFuture = 
this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
+                        retryTopicV2, requestCount, batchSize, filter, 
PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
                 }
             }
 
@@ -367,21 +399,18 @@ public class PopConsumerService extends ServiceThread {
                 getMessageFuture = this.getMessageAsync(getMessageFuture, 
clientHost, groupId,
                     topicId, queueId, batchSize, filter, 
PopConsumerRecord.RetryType.NORMAL_TOPIC);
             } else {
-                for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
-                    int current = (int) ((requestCount + i) % 
topicConfig.getReadQueueNums());
-                    getMessageFuture = this.getMessageAsync(getMessageFuture, 
clientHost, groupId,
-                        topicId, current, batchSize, filter, 
PopConsumerRecord.RetryType.NORMAL_TOPIC);
-                }
+                getMessageFuture = 
this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
+                    topicId, requestCount, batchSize, filter, 
PopConsumerRecord.RetryType.NORMAL_TOPIC);
 
                 if (!fifo && !preferRetry) {
                     if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
-                        getMessageFuture = 
this.getMessageAsync(getMessageFuture, clientHost, groupId,
-                            retryTopicV1, 0, batchSize, filter, 
PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
+                        getMessageFuture = 
this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
+                            retryTopicV1, requestCount, batchSize, filter, 
PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
                     }
 
                     if (brokerConfig.isEnableRetryTopicV2()) {
-                        getMessageFuture = 
this.getMessageAsync(getMessageFuture, clientHost, groupId,
-                            retryTopicV2, 0, batchSize, filter, 
PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
+                        getMessageFuture = 
this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
+                            retryTopicV2, requestCount, batchSize, filter, 
PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
                     }
                 }
             }
@@ -568,21 +597,33 @@ public class PopConsumerService extends ServiceThread {
         return consumerRecords.size();
     }
 
-    public void createRetryTopicIfNeeded(String groupId, String topicId) {
-        TopicConfig topicConfig = 
brokerController.getTopicConfigManager().selectTopicConfig(topicId);
-        if (topicConfig != null) {
+    public void createRetryTopicIfNeeded(String groupId, String retryTopic) {
+        TopicConfig topicConfig = 
brokerController.getTopicConfigManager().selectTopicConfig(retryTopic);
+        if (topicConfig != null && 
!brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
             return;
         }
 
-        topicConfig = new TopicConfig(topicId, 1, 1,
+        int retryQueueNum = PopAckConstants.retryQueueNum;
+        if (brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
+            String normalTopic = KeyBuilder.parseNormalTopic(retryTopic, 
groupId);
+            TopicConfig normalConfig = 
brokerController.getTopicConfigManager().selectTopicConfig(normalTopic); // 
always exists
+            retryQueueNum = normalConfig.getWriteQueueNums();
+            if (topicConfig != null && topicConfig.getWriteQueueNums() == 
normalConfig.getWriteQueueNums()) {
+                return;
+            }
+        }
+
+        topicConfig = new TopicConfig(retryTopic, retryQueueNum, retryQueueNum,
             PermName.PERM_READ | PermName.PERM_WRITE, 0);
         topicConfig.setTopicFilterType(TopicFilterType.SINGLE_TAG);
         
brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
 
-        long offset = 
this.brokerController.getConsumerOffsetManager().queryOffset(groupId, topicId, 
0);
-        if (offset < 0) {
-            this.brokerController.getConsumerOffsetManager().commitOffset(
-                "InitPopOffset", groupId, topicId, 0, 0);
+        for (int i = 0; i < retryQueueNum; i++) {
+            long offset = 
this.brokerController.getConsumerOffsetManager().queryOffset(groupId, 
retryTopic, i);
+            if (offset < 0) {
+                this.brokerController.getConsumerOffsetManager().commitOffset(
+                    "InitPopOffset", groupId, retryTopic, i, 0);
+            }
         }
     }
 
@@ -605,7 +646,7 @@ public class PopConsumerService extends ServiceThread {
         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
         msgInner.setTopic(retryTopic);
         msgInner.setBody(messageExt.getBody() != null ? messageExt.getBody() : 
new byte[] {});
-        msgInner.setQueueId(0);
+        msgInner.setQueueId(getRetryQueueId(retryTopic, messageExt));
         if (messageExt.getTags() != null) {
             msgInner.setTags(messageExt.getTags());
         } else {
@@ -647,6 +688,18 @@ public class PopConsumerService extends ServiceThread {
         return true;
     }
 
+    private int getRetryQueueId(String retryTopic, MessageExt oriMsg) {
+        if (!brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
+            return 0;
+        }
+        int oriQueueId = oriMsg.getQueueId(); // original qid of normal or 
retry topic
+        if (oriQueueId > 
brokerController.getTopicConfigManager().selectTopicConfig(retryTopic).getWriteQueueNums()
 - 1) {
+            log.warn("not expected, {}, {}, {}", retryTopic, oriQueueId, 
oriMsg.getMsgId());
+            return 0; // fallback
+        }
+        return oriQueueId;
+    }
+
     // Export kv store record to revive topic
     @SuppressWarnings("ExtractMethodRecommender")
     public synchronized void transferToFsStore() {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 3144eb973a..c32e1b5ae2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.common.constant.ConsumeInitMode;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
@@ -512,11 +513,15 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         // considered the same type because they share the same retry flag in 
previous fields.
         // Therefore, needRetryV1 is designed as a subset of needRetry, and 
within a single request,
         // only one type of retry topic is able to call popMsgFromQueue.
-        boolean needRetry = randomQ < 
brokerConfig.getPopFromRetryProbability();
+        boolean usePriorityMode = 
TopicMessageType.PRIORITY.equals(topicConfig.getTopicMessageType())
+            && !requestHeader.isOrder() && randomQ < 
subscriptionGroupConfig.getPriorityFactor();
+        boolean needRetry = randomQ < (usePriorityMode ?
+            brokerConfig.getPopFromRetryProbabilityForPriority() : 
brokerConfig.getPopFromRetryProbability());
         boolean needRetryV1 = false;
         if (brokerConfig.isEnableRetryTopicV2() && 
brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
             needRetryV1 = randomQ % 2 == 0;
         }
+        randomQ = usePriorityMode ? 0 : randomQ; // reset randomQ
         long popTime = System.currentTimeMillis();
         CompletableFuture<Long> getMessageFuture = 
CompletableFuture.completedFuture(0L);
         if (needRetry && !requestHeader.isOrder()) {
@@ -653,7 +658,9 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         StringBuilder msgOffsetInfo, StringBuilder orderCountInfo, int 
randomQ, CompletableFuture<Long> getMessageFuture) {
         if (topicConfig != null) {
             for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
-                int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
+                int index = 
(brokerController.getBrokerConfig().isPriorityOrderAsc() ?
+                    topicConfig.getReadQueueNums() - 1 - i : i) + randomQ;
+                int queueId = index % topicConfig.getReadQueueNums();
                 getMessageFuture = getMessageFuture.thenCompose(restNum ->
                     popMsgFromQueue(topicConfig.getTopicName(), 
requestHeader.getAttemptId(), isRetry,
                         getMessageResult, requestHeader, queueId, restNum, 
reviveQid, channel, popTime, messageFilter,
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 434812883e..e88879d9c6 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -112,7 +112,6 @@ public class PopReviveService extends ServiceThread {
             msgInner.setTopic(popCheckPoint.getTopic());
         }
         msgInner.setBody(messageExt.getBody());
-        msgInner.setQueueId(0);
         if (messageExt.getTags() != null) {
             msgInner.setTags(messageExt.getTags());
         } else {
@@ -131,6 +130,7 @@ public class PopReviveService extends ServiceThread {
         msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, 
popCheckPoint.getCId());
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
         addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
+        msgInner.setQueueId(getRetryQueueId(msgInner.getTopic(), messageExt));
         PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
         
brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveRetryMessageCount(popCheckPoint,
 putMessageResult.getPutMessageStatus());
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
@@ -150,30 +150,55 @@ public class PopReviveService extends ServiceThread {
         return true;
     }
 
-    private void initPopRetryOffset(String topic, String consumerGroup) {
-        long offset = 
this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup, 
topic, 0);
-        if (offset < 0) {
-            
this.brokerController.getConsumerOffsetManager().commitOffset("initPopRetryOffset",
 consumerGroup, topic,
-                0, 0);
+    private void initPopRetryOffset(String retryTopic, String consumerGroup, 
int retryQueueNum) {
+        for (int i = 0; i < retryQueueNum; i++) {
+            long offset = 
this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup, 
retryTopic, i);
+            if (offset < 0) {
+                
this.brokerController.getConsumerOffsetManager().commitOffset("initPopRetryOffset",
 consumerGroup, retryTopic, i, 0);
+            }
         }
     }
 
-    public void addRetryTopicIfNotExist(String topic, String consumerGroup) {
+    public void addRetryTopicIfNotExist(String retryTopic, String 
consumerGroup) {
         if (brokerController != null) {
-            TopicConfig topicConfig = 
brokerController.getTopicConfigManager().selectTopicConfig(topic);
-            if (topicConfig != null) {
+            TopicConfig topicConfig = 
brokerController.getTopicConfigManager().selectTopicConfig(retryTopic);
+            if (topicConfig != null && 
!brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
                 return;
             }
-            topicConfig = new TopicConfig(topic);
-            topicConfig.setReadQueueNums(PopAckConstants.retryQueueNum);
-            topicConfig.setWriteQueueNums(PopAckConstants.retryQueueNum);
+
+            int retryQueueNum = PopAckConstants.retryQueueNum;
+            if (brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
+                String normalTopic = KeyBuilder.parseNormalTopic(retryTopic, 
consumerGroup);
+                TopicConfig normalConfig = 
brokerController.getTopicConfigManager().selectTopicConfig(normalTopic); // 
always exists
+                retryQueueNum = normalConfig.getWriteQueueNums();
+                if (topicConfig != null && topicConfig.getWriteQueueNums() == 
normalConfig.getWriteQueueNums()) {
+                    return;
+                }
+            }
+
+            // create new one, or update in case of queue expansion
+            topicConfig = new TopicConfig(retryTopic);
+            topicConfig.setReadQueueNums(retryQueueNum);
+            topicConfig.setWriteQueueNums(retryQueueNum);
             topicConfig.setTopicFilterType(TopicFilterType.SINGLE_TAG);
             topicConfig.setPerm(6);
             topicConfig.setTopicSysFlag(0);
             
brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
 
-            initPopRetryOffset(topic, consumerGroup);
+            initPopRetryOffset(retryTopic, consumerGroup, retryQueueNum);
+        }
+    }
+
+    private int getRetryQueueId(String retryTopic, MessageExt messageExt) {
+        if (!brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
+            return 0;
+        }
+        int oriQueueId = messageExt.getQueueId(); // original qid of normal or 
retry topic
+        if (oriQueueId > 
brokerController.getTopicConfigManager().selectTopicConfig(retryTopic).getWriteQueueNums()
 - 1) {
+            POP_LOGGER.warn("not expected, {}, {}, {}", retryTopic, 
oriQueueId, messageExt.getMsgId());
+            return 0; // fallback
         }
+        return oriQueueId;
     }
 
     protected List<MessageExt> getReviveMessage(long offset, int queueId) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index eefdb85ccf..c8e7e4c128 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -281,6 +281,16 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
         }
 
         MessageAccessor.setProperties(msgInner, oriProps);
+        // check properties to ensure exclusive, don't check topic meta config 
to keep the behavior consistent
+        int msgPriority = msgInner.getPriority();
+        if (msgPriority >= 0) {
+            if 
(TopicMessageType.PRIORITY.equals(TopicMessageType.parseFromMessageProperty(msgInner.getProperties())))
 {
+                queueIdInt = Math.min(msgPriority, 
topicConfig.getWriteQueueNums() - 1);
+                msgInner.setQueueId(queueIdInt);
+            } else {
+                MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_PRIORITY);
+            }
+        }
 
         CleanupPolicy cleanupPolicy = 
CleanupPolicyUtils.getDeletePolicy(Optional.of(topicConfig));
         if (Objects.equals(cleanupPolicy, CleanupPolicy.COMPACTION)) {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
index db5f60fb17..dfa626c885 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
@@ -37,6 +37,7 @@ import 
org.apache.rocketmq.broker.longpolling.PopLongPollingService;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
 import org.apache.rocketmq.broker.processor.PopMessageProcessor;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.KeyBuilder;
@@ -93,6 +94,7 @@ public class PopConsumerServiceTest {
         messageStoreConfig.setStorePathRootDir(filePath);
 
         TopicConfigManager topicConfigManager = 
Mockito.mock(TopicConfigManager.class);
+        SubscriptionGroupManager subscriptionGroupManager = 
Mockito.mock(SubscriptionGroupManager.class);
         ConsumerOffsetManager consumerOffsetManager = 
Mockito.mock(ConsumerOffsetManager.class);
         PopMessageProcessor popMessageProcessor = 
Mockito.mock(PopMessageProcessor.class);
         PopLongPollingService popLongPollingService = 
Mockito.mock(PopLongPollingService.class);
@@ -101,6 +103,7 @@ public class PopConsumerServiceTest {
         brokerController = Mockito.mock(BrokerController.class);
         
Mockito.when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
         
Mockito.when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+        
Mockito.when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
         
Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
         
Mockito.when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
         
Mockito.when(brokerController.getPopMessageProcessor()).thenReturn(popMessageProcessor);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index a46435543a..5142ed12be 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -240,13 +240,18 @@ public class BrokerConfig extends BrokerIdentity {
     private boolean retrieveMessageFromPopRetryTopicV1 = true;
     private boolean enableRetryTopicV2 = false;
     private int popFromRetryProbability = 20;
+    // pop retry probability for priority mode
+    private int popFromRetryProbabilityForPriority = 0;
+    // 0 as the lowest priority if true
+    private boolean priorityOrderAsc = true;
     private boolean popConsumerFSServiceInit = true;
     private boolean popConsumerKVServiceLog = false;
     private boolean popConsumerKVServiceInit = false;
     private boolean popConsumerKVServiceEnable = false;
     private int popReviveMaxReturnSizePerRead = 16 * 1024;
     private int popReviveMaxAttemptTimes = 16;
-
+    // each message queue will have a corresponding retry queue
+    private boolean useSeparateRetryQueue = false;
     private boolean realTimeNotifyConsumerChange = true;
 
     private boolean litePullMessageEnable = true;
@@ -2177,4 +2182,28 @@ public class BrokerConfig extends BrokerIdentity {
     public void setSplitMetadataSize(int splitMetadataSize) {
         this.splitMetadataSize = splitMetadataSize;
     }
+
+    public int getPopFromRetryProbabilityForPriority() {
+        return popFromRetryProbabilityForPriority;
+    }
+
+    public void setPopFromRetryProbabilityForPriority(int 
popFromRetryProbabilityForPriority) {
+        this.popFromRetryProbabilityForPriority = 
popFromRetryProbabilityForPriority;
+    }
+
+    public boolean isPriorityOrderAsc() {
+        return priorityOrderAsc;
+    }
+
+    public void setPriorityOrderAsc(boolean priorityOrderAsc) {
+        this.priorityOrderAsc = priorityOrderAsc;
+    }
+
+    public boolean isUseSeparateRetryQueue() {
+        return useSeparateRetryQueue;
+    }
+
+    public void setUseSeparateRetryQueue(boolean useSeparateRetryQueue) {
+        this.useSeparateRetryQueue = useSeparateRetryQueue;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
 
b/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
index 5b0072401c..845f407939 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
@@ -19,11 +19,20 @@ package org.apache.rocketmq.common;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.rocketmq.common.attribute.Attribute;
+import org.apache.rocketmq.common.attribute.LongRangeAttribute;
 
 public class SubscriptionGroupAttributes {
     public static final Map<String, Attribute> ALL;
+    public static final LongRangeAttribute PRIORITY_FACTOR_ATTRIBUTE = new 
LongRangeAttribute(
+        "priority.factor",
+        true,
+        0, // disable priority mode
+        100, // enable priority mode
+        100
+    );
 
     static {
         ALL = new HashMap<>();
+        ALL.put(PRIORITY_FACTOR_ATTRIBUTE.getName(), 
PRIORITY_FACTOR_ATTRIBUTE);
     }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
 
b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
index 5e581a34ee..9a89d30e8f 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
@@ -28,6 +28,7 @@ public enum TopicMessageType {
     FIFO("FIFO"),
     DELAY("DELAY"),
     TRANSACTION("TRANSACTION"),
+    PRIORITY("PRIORITY"),
     MIXED("MIXED");
 
     private final String value;
@@ -36,7 +37,8 @@ public enum TopicMessageType {
     }
 
     public static Set<String> topicMessageTypeSet() {
-        return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, 
DELAY.value, TRANSACTION.value, MIXED.value);
+        return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, 
DELAY.value, TRANSACTION.value,
+            PRIORITY.value, MIXED.value);
     }
 
     public String getValue() {
@@ -44,9 +46,8 @@ public enum TopicMessageType {
     }
 
     public static TopicMessageType parseFromMessageProperty(Map<String, 
String> messageProperty) {
-        String isTrans = 
messageProperty.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
-        String isTransValue = "true";
-        if (isTransValue.equals(isTrans)) {
+        // the parse order keeps message types mutually exclusive
+        if 
(Boolean.parseBoolean(messageProperty.get(MessageConst.PROPERTY_TRANSACTION_PREPARED)))
 {
             return TopicMessageType.TRANSACTION;
         } else if (messageProperty.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL) 
!= null
             || messageProperty.get(MessageConst.PROPERTY_TIMER_DELIVER_MS) != 
null
@@ -55,6 +56,8 @@ public enum TopicMessageType {
             return TopicMessageType.DELAY;
         } else if (messageProperty.get(MessageConst.PROPERTY_SHARDING_KEY) != 
null) {
             return TopicMessageType.FIFO;
+        } else if (messageProperty.get(MessageConst.PROPERTY_PRIORITY) != 
null) {
+            return TopicMessageType.PRIORITY;
         }
         return TopicMessageType.NORMAL;
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/Message.java 
b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
index acd4df96d2..b64f3520c1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.common.message;
 
+import org.apache.commons.lang3.math.NumberUtils;
+
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
@@ -154,6 +156,17 @@ public class Message implements Serializable {
         this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 
String.valueOf(level));
     }
 
+    public void setPriority(int priority) {
+        if (priority < 0) {
+            throw new IllegalArgumentException("The priority must be greater 
than or equal to 0");
+        }
+        this.putProperty(MessageConst.PROPERTY_PRIORITY, 
String.valueOf(priority));
+    }
+
+    public int getPriority() {
+        return 
NumberUtils.toInt(this.getProperty(MessageConst.PROPERTY_PRIORITY), -1);
+    }
+
     public boolean isWaitStoreMsgOK() {
         String result = 
this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
         if (null == result) {
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index 81f132d134..72078f761d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -44,6 +44,7 @@ public class MessageConst {
     public static final String PROPERTY_EXTEND_UNIQ_INFO = "EXTEND_UNIQ_INFO";
     public static final String PROPERTY_MAX_RECONSUME_TIMES = 
"MAX_RECONSUME_TIMES";
     public static final String PROPERTY_CONSUME_START_TIMESTAMP = 
"CONSUME_START_TIME";
+    public static final String PROPERTY_PRIORITY = "_SYS_MSG_PRIORITY_";
     public static final String PROPERTY_INNER_NUM = "INNER_NUM";
     public static final String PROPERTY_INNER_BASE = "INNER_BASE";
     public static final String DUP_INFO = "DUP_INFO";
@@ -169,5 +170,6 @@ public class MessageConst {
         STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_TOPIC);
         STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
         STRING_HASH_SET.add(PROPERTY_CRC32);
+        STRING_HASH_SET.add(PROPERTY_PRIORITY);
     }
 }
diff --git 
a/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java
 
b/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java
index 0321679ccc..79402ca1b2 100644
--- 
a/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java
+++ 
b/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java
@@ -33,6 +33,7 @@ public class TopicMessageTypeTest {
     private Map<String, String> transactionMessageProperty;
     private Map<String, String> delayMessageProperty;
     private Map<String, String> fifoMessageProperty;
+    private Map<String, String> priorityMessageProperty;
 
     @Before
     public void setUp() {
@@ -40,15 +41,18 @@ public class TopicMessageTypeTest {
         transactionMessageProperty = new HashMap<>();
         delayMessageProperty = new HashMap<>();
         fifoMessageProperty = new HashMap<>();
+        priorityMessageProperty = new HashMap<>();
 
         
transactionMessageProperty.put(MessageConst.PROPERTY_TRANSACTION_PREPARED, 
"true");
         delayMessageProperty.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "1");
         fifoMessageProperty.put(MessageConst.PROPERTY_SHARDING_KEY, 
"shardingKey");
+        priorityMessageProperty.put(MessageConst.PROPERTY_PRIORITY, "3");
     }
 
     @Test
     public void testTopicMessageTypeSet() {
-        Set<String> expectedSet = Sets.newHashSet("UNSPECIFIED", "NORMAL", 
"FIFO", "DELAY", "TRANSACTION", "MIXED");
+        Set<String> expectedSet
+            = Sets.newHashSet("UNSPECIFIED", "NORMAL", "FIFO", "DELAY", 
"TRANSACTION", "PRIORITY", "MIXED");
         Set<String> actualSet = TopicMessageType.topicMessageTypeSet();
         assertEquals(expectedSet, actualSet);
     }
@@ -77,6 +81,12 @@ public class TopicMessageTypeTest {
         assertEquals(TopicMessageType.FIFO, actual);
     }
 
+    @Test
+    public void testParseFromMessageProperty_Priority() {
+        TopicMessageType actual = 
TopicMessageType.parseFromMessageProperty(priorityMessageProperty);
+        assertEquals(TopicMessageType.PRIORITY, actual);
+    }
+
     @Test
     public void testGetMetricsValue() {
         for (TopicMessageType type : TopicMessageType.values()) {
@@ -116,6 +126,13 @@ public class TopicMessageTypeTest {
         properties.put(MessageConst.PROPERTY_SHARDING_KEY, "sharding_key");
         Assert.assertEquals(TopicMessageType.FIFO, 
TopicMessageType.parseFromMessageProperty(properties));
 
+        // PRIORITY
+        properties.clear();
+        properties.put(MessageConst.PROPERTY_PRIORITY, "3");
+        Assert.assertEquals(TopicMessageType.PRIORITY, 
TopicMessageType.parseFromMessageProperty(properties));
+        properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3");
+        Assert.assertEquals(TopicMessageType.DELAY, 
TopicMessageType.parseFromMessageProperty(properties));
+
         // NORMAL
         properties.clear();
         Assert.assertEquals(TopicMessageType.NORMAL, 
TopicMessageType.parseFromMessageProperty(properties));
diff --git a/pom.xml b/pom.xml
index 4343c3a574..088dda8b77 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,7 +127,7 @@
         <annotations-api.version>6.0.53</annotations-api.version>
         <extra-enforcer-rules.version>1.0-beta-4</extra-enforcer-rules.version>
         
<concurrentlinkedhashmap-lru.version>1.4.2</concurrentlinkedhashmap-lru.version>
-        <rocketmq-proto.version>2.0.4</rocketmq-proto.version>
+        <rocketmq-proto.version>2.1.1</rocketmq-proto.version>
         <grpc.version>1.53.0</grpc.version>
         <protobuf.version>3.20.1</protobuf.version>
         <disruptor.version>1.2.10</disruptor.version>
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
index 33a4e1312f..4ce3dc831d 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
@@ -212,6 +212,12 @@ public class GrpcConverter {
             }
         }
 
+        // priority
+        int priority = messageExt.getPriority();
+        if (priority >= 0) {
+            systemPropertiesBuilder.setPriority(priority);
+        }
+
         // sharding key
         String shardingKey = 
messageExt.getProperty(MessageConst.PROPERTY_SHARDING_KEY);
         if (shardingKey != null) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
index f7b8014bb9..2c3ffd1305 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
@@ -262,6 +262,12 @@ public class SendMessageActivity extends 
AbstractMessingActivity {
         // set delay level or deliver timestamp
         fillDelayMessageProperty(message, messageWithHeader);
 
+        // set priority
+        if (message.getSystemProperties().hasPriority()) {
+            int priority = message.getSystemProperties().getPriority();
+            messageWithHeader.setPriority(priority);
+        }
+
         // set reconsume times
         int reconsumeTimes = 
message.getSystemProperties().getDeliveryAttempt();
         MessageAccessor.setReconsumeTime(messageWithHeader, 
String.valueOf(reconsumeTimes));
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index a4e79a856f..76f86b436d 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -306,6 +306,8 @@ public class RouteActivity extends AbstractMessingActivity {
                 return Collections.singletonList(MessageType.TRANSACTION);
             case DELAY:
                 return Collections.singletonList(MessageType.DELAY);
+            case PRIORITY:
+                return Collections.singletonList(MessageType.PRIORITY);
             case MIXED:
                 return Arrays.asList(MessageType.NORMAL, MessageType.FIFO, 
MessageType.DELAY, MessageType.TRANSACTION);
             default:
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
index 4882a5ed8b..a64867ddfe 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
@@ -268,6 +268,34 @@ public class SendMessageActivityTest extends 
BaseActivityTest {
         assertEquals(MessageSysFlag.TRANSACTION_PREPARED_TYPE | 
MessageSysFlag.COMPRESSED_FLAG, sendMessageActivity.buildSysFlag(message));
     }
 
+    @Test
+    public void testPriorityMessage() {
+        String msgId = MessageClientIDSetter.createUniqID();
+        Message message = Message.newBuilder()
+            .setTopic(Resource.newBuilder()
+                .setName(TOPIC)
+                .build())
+            .setSystemProperties(SystemProperties.newBuilder()
+                .setMessageId(msgId)
+                .setQueueId(0)
+                .setMessageType(MessageType.PRIORITY)
+                .setPriority(5)
+                .setBodyEncoding(Encoding.GZIP)
+                
.setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis()))
+                
.setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(), 
"127.0.0.1:1234"))
+                .build())
+            .setBody(ByteString.copyFromUtf8("123"))
+            .build();
+        org.apache.rocketmq.common.message.Message messageExt = 
this.sendMessageActivity.buildMessage(null,
+            Lists.newArrayList(
+                message
+            ),
+            Resource.newBuilder().setName(TOPIC).build()).get(0);
+
+        assertEquals(MessageClientIDSetter.getUniqID(messageExt), msgId);
+        assertEquals(5, messageExt.getPriority());
+    }
+
     @Test
     public void testSendOrderMessageQueueSelector() throws Exception {
         TopicRouteData topicRouteData = new TopicRouteData();
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
index c9c2a8090c..2c3738a464 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
@@ -17,13 +17,17 @@
 
 package org.apache.rocketmq.remoting.protocol.subscription;
 
+import com.alibaba.fastjson2.annotation.JSONField;
 import com.google.common.base.MoreObjects;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.rocketmq.common.MixAll;
 
+import static 
org.apache.rocketmq.common.SubscriptionGroupAttributes.PRIORITY_FACTOR_ATTRIBUTE;
+
 public class SubscriptionGroupConfig {
 
     private String groupName;
@@ -173,6 +177,12 @@ public class SubscriptionGroupConfig {
         this.attributes = attributes;
     }
 
+    @JSONField(serialize = false, deserialize = false)
+    public long getPriorityFactor() {
+        String factorStr = null == attributes ? null : 
attributes.get(PRIORITY_FACTOR_ATTRIBUTE.getName());
+        return NumberUtils.toLong(factorStr, 
PRIORITY_FACTOR_ATTRIBUTE.getDefaultValue());
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
index 7df189a915..75444d3a1f 100644
--- 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
+++ 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
@@ -110,6 +110,7 @@ public class RMQNormalProducer extends AbstractMQProducer {
             msgBodys.addData(new String(message.getBody(), 
StandardCharsets.UTF_8));
             originMsgs.addData(msg);
             originMsgIndex.put(new String(message.getBody(), 
StandardCharsets.UTF_8), internalSendResult);
+            sendResult.setSendResultObj(internalSendResult);
         } catch (Exception e) {
             if (isDebug) {
                 e.printStackTrace();
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/sendresult/ResultWrapper.java 
b/test/src/main/java/org/apache/rocketmq/test/sendresult/ResultWrapper.java
index 9fe31463e4..d9a5987ff4 100644
--- a/test/src/main/java/org/apache/rocketmq/test/sendresult/ResultWrapper.java
+++ b/test/src/main/java/org/apache/rocketmq/test/sendresult/ResultWrapper.java
@@ -17,11 +17,14 @@
 
 package org.apache.rocketmq.test.sendresult;
 
+import org.apache.rocketmq.client.producer.SendResult;
+
 public class ResultWrapper {
     private boolean sendResult = false;
     private String msgId = null;
     private Exception sendException = null;
     private String brokerIp = null;
+    private SendResult sendResultObj = null;
 
     public String getBrokerIp() {
         return brokerIp;
@@ -55,6 +58,13 @@ public class ResultWrapper {
         this.sendException = sendException;
     }
 
+    public SendResult getSendResultObj() {
+        return sendResultObj;
+    }
+    public void setSendResultObj(SendResult sendResultObj) {
+        this.sendResultObj = sendResultObj;
+    }
+
     @Override
     public String toString() {
         return String.format("sendstatus:%s msgId:%s", sendResult, msgId);
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java 
b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 276d08d806..3b6154ae6b 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -104,12 +104,10 @@ public class MQAdminTestUtils {
         return createResult;
     }
 
-    public static boolean createSub(String nameSrvAddr, String clusterName, 
String consumerId) {
+    public static boolean createSub(String nameSrvAddr, String clusterName, 
SubscriptionGroupConfig config) {
         boolean createResult = true;
         DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
         mqAdminExt.setNamesrvAddr(nameSrvAddr);
-        SubscriptionGroupConfig config = new SubscriptionGroupConfig();
-        config.setGroupName(consumerId);
         try {
             mqAdminExt.start();
             Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(mqAdminExt,
@@ -117,7 +115,7 @@ public class MQAdminTestUtils {
             for (String addr : masterSet) {
                 try {
                     mqAdminExt.createAndUpdateSubscriptionGroupConfig(addr, 
config);
-                    log.info("create subscription group {} to {} success.", 
consumerId, addr);
+                    log.info("create subscription group {} to {} success.", 
config.getGroupName(), addr);
                 } catch (Exception e) {
                     e.printStackTrace();
                     Thread.sleep(1000 * 1);
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java 
b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 472e106ce3..50741ba091 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -203,15 +204,21 @@ public class BaseConf {
     }
 
     public static String initConsumerGroup() {
-        String group = MQRandomUtils.getRandomConsumerGroup();
-        return initConsumerGroup(group);
+        return initConsumerGroup(MQRandomUtils.getRandomConsumerGroup());
     }
 
     public static String initConsumerGroup(String group) {
-        MQAdminTestUtils.createSub(NAMESRV_ADDR, CLUSTER_NAME, group);
+        SubscriptionGroupConfig config = new SubscriptionGroupConfig();
+        config.setGroupName(group);
+        MQAdminTestUtils.createSub(NAMESRV_ADDR, CLUSTER_NAME, config);
         return group;
     }
 
+    public static String initConsumerGroup(SubscriptionGroupConfig config) {
+        MQAdminTestUtils.createSub(NAMESRV_ADDR, CLUSTER_NAME, config);
+        return config.getGroupName();
+    }
+
     public static DefaultMQAdminExt getAdmin(String nsAddr) {
         final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(3 * 1000);
         mqAdminExt.setNamesrvAddr(nsAddr);
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java 
b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 287e54d561..cfcb989649 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -141,6 +141,7 @@ public class IntegrationTestBase {
         brokerConfig.setRecallMessageEnable(true);
         storeConfig.setEnableConsumeQueueExt(true);
         brokerConfig.setLoadBalancePollNameServerInterval(500);
+        brokerConfig.setPopConsumerKVServiceInit(true);
         storeConfig.setStorePathRootDir(baseDir);
         storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
         storeConfig.setMappedFileSizeCommitLog(commitLogSize);
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
index 2e29b95a5a..d4a1b3be5a 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
@@ -18,11 +18,15 @@
 package org.apache.rocketmq.test.client.consumer.pop;
 
 import java.util.concurrent.CompletableFuture;
+
+import org.apache.rocketmq.client.consumer.AckResult;
 import org.apache.rocketmq.client.consumer.PopResult;
 import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.common.constant.ConsumeInitMode;
 import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.test.base.IntegrationTestBase;
 import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -69,4 +73,8 @@ public class BasePopNormally extends BasePop {
             brokerAddr, messageQueue, invisibleTime, maxNums, group, 3000, 
false,
             ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
     }
+
+    protected CompletableFuture<AckResult> ackMessageAsync(MessageExt 
messageExt) {
+        return client.ackMessageAsync(brokerAddr, topic, group, 
messageExt.getProperty(MessageConst.PROPERTY_POP_CK));
+    }
 }
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopPriorityIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopPriorityIT.java
new file mode 100644
index 0000000000..98f7ae55bd
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopPriorityIT.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.pop;
+
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.attribute.AttributeParser;
+import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.test.base.IntegrationTestBase;
+import org.apache.rocketmq.test.util.TestUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.rocketmq.common.SubscriptionGroupAttributes.PRIORITY_FACTOR_ATTRIBUTE;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class PopPriorityIT extends BasePopNormally {
+
+    private final boolean popConsumerKVServiceEnable;
+    private final boolean priorityOrderAsc;
+    private int writeQueueNum = 8;
+
+    public PopPriorityIT(boolean popConsumerKVServiceEnable, boolean 
priorityOrderAsc) {
+        this.popConsumerKVServiceEnable = popConsumerKVServiceEnable;
+        this.priorityOrderAsc = priorityOrderAsc;
+    }
+
+    @Parameterized.Parameters
+    public static List<Object[]> params() {
+        List<Object[]> result = new ArrayList<>();
+        result.add(new Object[] {false, true});
+        result.add(new Object[] {false, false});
+        result.add(new Object[] {true, true});
+        result.add(new Object[] {true, false});
+        return result;
+    }
+
+    @Before
+    public void setUp() {
+        super.setUp();
+        // reset default config if changed
+        writeQueueNum = 8;
+        
brokerController1.getBrokerConfig().setPopFromRetryProbabilityForPriority(0);
+        brokerController1.getBrokerConfig().setUseSeparateRetryQueue(false);
+        
brokerController1.getBrokerConfig().setPopConsumerKVServiceEnable(popConsumerKVServiceEnable);
+        
brokerController1.getBrokerConfig().setPriorityOrderAsc(priorityOrderAsc);
+        IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 
writeQueueNum, CQType.SimpleCQ, TopicMessageType.PRIORITY);
+    }
+
+    @After
+    public void tearDown() {
+        super.tearDown();
+    }
+
+    @Test
+    public void test_normal_send() {
+        int priority = -1; // normal message
+        Set<Integer> queueIdSet = new HashSet<>();
+        for (int i = 0; i < 32; i++) {
+            Message message = mockMessage(topic, priority, "");
+            SendResult sendResult = producer.send(message, 
null).getSendResultObj();
+            queueIdSet.add(sendResult.getMessageQueue().getQueueId());
+        }
+        assertTrue(queueIdSet.size() > 1);
+    }
+
+    @Test
+    public void test_priority_send() {
+        final int priority = 0; // priority message
+        for (int i = 0; i < 32; i++) {
+            Message message = mockMessage(topic, priority, "");
+            SendResult sendResult = producer.send(message, 
null).getSendResultObj();
+            assertEquals(priority, sendResult.getMessageQueue().getQueueId());
+        }
+    }
+
+    @Test
+    public void test_priority_consume_always_high_priority() throws Exception {
+        int msgNumPerQueue = 20;
+        final int maxPriority = priorityOrderAsc ? writeQueueNum - 1 : 0;
+        for (int i = 0; i < writeQueueNum; i++) {
+            Message message = mockMessage(topic, i, String.valueOf(i));
+            for (int j = 0; j < msgNumPerQueue; j++) {
+                producer.send(message);
+            }
+        }
+        Assert.assertTrue(awaitDispatchMs(2000));
+        for (int i = 0; i < msgNumPerQueue; i++) {
+            PopResult popResult = 
popMessageAsync(Duration.ofSeconds(600).toMillis(), 1, 30000).get();
+            TestUtil.waitForMonment(20); // wait lock release
+            assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+            MessageExt message = popResult.getMsgFoundList().get(0);
+            assertEquals(maxPriority, message.getPriority()); // not a 
coincidence
+        }
+    }
+
+    @Test
+    public void test_priority_consume_from_high_to_low() throws Exception {
+        for (int i = 0; i < writeQueueNum; i++) {
+            Message message = mockMessage(topic, i, String.valueOf(i));
+            producer.send(message);
+        }
+        Assert.assertTrue(awaitDispatchMs(2000));
+        for (int i = 0; i < writeQueueNum; i++) {
+            PopResult popResult = 
popMessageAsync(Duration.ofSeconds(30).toMillis(), 1, 30000).get();
+            assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+            MessageExt message = popResult.getMsgFoundList().get(0);
+            int expectPriority = priorityOrderAsc ? writeQueueNum - 1 - i : i;
+            assertEquals(0, message.getQueueOffset());
+            assertEquals(expectPriority, message.getQueueId());
+            assertEquals(expectPriority, message.getPriority());
+        }
+    }
+
+    @Test
+    public void test_priority_consume_disable() throws Exception {
+        SubscriptionGroupConfig config = new SubscriptionGroupConfig();
+        config.setGroupName(group);
+        config.setAttributes(AttributeParser.parseToMap("+" + 
PRIORITY_FACTOR_ATTRIBUTE.getName() + "=0"));
+        initConsumerGroup(config);
+
+        int msgNumPerQueue = 200;
+        for (int i = 0; i < writeQueueNum; i++) {
+            Message message = mockMessage(topic, i, String.valueOf(i));
+            for (int j = 0; j < msgNumPerQueue; j++) {
+                producer.send(message);
+            }
+        }
+        Assert.assertTrue(awaitDispatchMs(2000));
+        int sampleCount = 800;
+        int[] queueIdCount = new int[writeQueueNum];
+        for (int i = 0; i < sampleCount; i++) {
+            PopResult popResult = 
popMessageAsync(Duration.ofSeconds(600).toMillis(), 1, 30000).get();
+            TestUtil.waitForMonment(10); // wait lock release
+            assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+            MessageExt message = popResult.getMsgFoundList().get(0);
+            queueIdCount[message.getQueueId()] = 
queueIdCount[message.getQueueId()] + 1;
+        }
+
+        double expectAverage = (double) sampleCount / writeQueueNum;
+        for (int count : queueIdCount) {
+            assertTrue(Math.abs(count - expectAverage) < expectAverage * 0.4);
+        }
+    }
+
+    @Test
+    public void test_priority_consume_retry_as_lowest() throws Exception {
+        // retry as lowest by default
+        int count = 100;
+        for (int i = 0; i < count; i++) {
+            Message message = mockMessage(topic, new 
Random().nextInt(writeQueueNum), String.valueOf(i));
+            producer.send(message);
+        }
+        int invisibleTime = 3;
+        PopResult popResult = 
popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), 1, 30000).get();
+        assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+        String retryId = popResult.getMsgFoundList().get(0).getMsgId();
+        TestUtil.waitForSeconds(invisibleTime + 3);
+        Assert.assertTrue(awaitDispatchMs(2000));
+
+        List<MessageExt> collect = new ArrayList<>();
+        await()
+            .pollInterval(1, TimeUnit.SECONDS)
+            .atMost(35, TimeUnit.SECONDS)
+            .until(() -> {
+                PopResult result = 
popMessageAsync(Duration.ofSeconds(600).toMillis(), 32, 5000).get();
+                if (PopStatus.FOUND.equals(result.getPopStatus())) {
+                    collect.addAll(result.getMsgFoundList());
+                    return false;
+                }
+                return true;
+            });
+
+        assertEquals(count, collect.size());
+        assertEquals(1, collect.get(collect.size() - 1).getReconsumeTimes());
+        assertEquals(retryId, collect.get(collect.size() - 1).getMsgId());
+    }
+
+    @Test
+    public void test_priority_consume_retry_as_highest() throws Exception {
+        
brokerController1.getBrokerConfig().setPopFromRetryProbabilityForPriority(100);
+        int count = 100;
+        for (int i = 0; i < count; i++) {
+            Message message = mockMessage(topic, new 
Random().nextInt(writeQueueNum), String.valueOf(i));
+            producer.send(message);
+        }
+        int invisibleTime = 3;
+        PopResult popResult = 
popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), 1, 30000).get();
+        assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+        String retryId = popResult.getMsgFoundList().get(0).getMsgId();
+        TestUtil.waitForSeconds(invisibleTime + 3);
+        Assert.assertTrue(awaitDispatchMs(2000));
+
+        List<MessageExt> collect = new ArrayList<>();
+        await()
+            .pollInterval(1, TimeUnit.SECONDS)
+            .atMost(35, TimeUnit.SECONDS)
+            .until(() -> {
+                PopResult result = 
popMessageAsync(Duration.ofSeconds(600).toMillis(), 32, 5000).get();
+                if (PopStatus.FOUND.equals(result.getPopStatus())) {
+                    collect.addAll(result.getMsgFoundList());
+                    return false;
+                }
+                return true;
+            });
+
+        assertEquals(count, collect.size());
+        assertEquals(1, collect.get(0).getReconsumeTimes());
+        assertEquals(retryId, collect.get(0).getMsgId());
+    }
+
+    @Test
+    public void test_priority_consume_use_separate_retry_queue() throws 
Exception {
+        brokerController1.getBrokerConfig().setUseSeparateRetryQueue(true);
+        
brokerController1.getBrokerConfig().setPopFromRetryProbabilityForPriority(100);
+        for (int i = 0; i < writeQueueNum; i++) {
+            Message message = mockMessage(topic, i, String.valueOf(i));
+            producer.send(message);
+        }
+        Assert.assertTrue(awaitDispatchMs(2000));
+        int invisibleTime = 3;
+        PopResult popResult = 
popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), writeQueueNum, 
30000).get();
+        assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+        assertEquals(writeQueueNum, popResult.getMsgFoundList().size());
+        TestUtil.waitForSeconds(invisibleTime + 3);
+
+        popResult = popMessageAsync(Duration.ofSeconds(600).toMillis(), 32, 
10000).get();
+        assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+        assertEquals(writeQueueNum, popResult.getMsgFoundList().size());
+        for (int i = 0; i < writeQueueNum; i++) {
+            MessageExt message = popResult.getMsgFoundList().get(i);
+            assertEquals(0, message.getQueueOffset()); // means a separate 
retry queue
+            assertEquals(1, message.getReconsumeTimes());
+            int expectPriority = priorityOrderAsc ? writeQueueNum - 1 - i : i;
+            assertEquals(expectPriority, message.getQueueId());
+            assertEquals(expectPriority, message.getPriority());
+        }
+    }
+
+    @Test
+    public void 
test_priority_consume_use_separate_retry_queue_with_queue_expansion() throws 
Exception {
+        // retry as lowest by default
+        brokerController1.getBrokerConfig().setUseSeparateRetryQueue(true);
+        for (int i = 0; i < writeQueueNum; i++) {
+            Message message = mockMessage(topic, i, String.valueOf(i));
+            producer.send(message);
+        }
+        Assert.assertTrue(awaitDispatchMs(2000));
+        int invisibleTime = 3;
+        PopResult popResult = 
popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), writeQueueNum, 
30000).get();
+        assertEquals(PopStatus.FOUND, popResult.getPopStatus());
+        assertEquals(writeQueueNum, popResult.getMsgFoundList().size());
+        TestUtil.waitForSeconds(invisibleTime + 3); // wait retry created
+
+        writeQueueNum = writeQueueNum * 2;
+        IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 
writeQueueNum, CQType.SimpleCQ, TopicMessageType.PRIORITY);
+        for (int i = writeQueueNum / 2; i < writeQueueNum; i++) {
+            Message message = mockMessage(topic, i, String.valueOf(i));
+            producer.send(message);
+        }
+        Assert.assertTrue(awaitDispatchMs(2000));
+
+        popResult = 
popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), 32, 5000).get();
+        List<MessageExt> msgList = popResult.getMsgFoundList();
+        // asc == true, collect: [15 -> 8, 7 -> 0]
+        // asc == false, collect: [8 -> 15, 0 -> 7]
+        assertEquals(writeQueueNum, msgList.size());
+        assertEquals(priorityOrderAsc ? writeQueueNum - 1 : writeQueueNum / 2, 
msgList.get(0).getQueueId());
+        assertEquals(priorityOrderAsc ? writeQueueNum - 1 : writeQueueNum / 2, 
msgList.get(0).getPriority());
+        assertEquals(priorityOrderAsc ? 0 : writeQueueNum / 2 - 1, 
msgList.get(msgList.size() - 1).getQueueId());
+        assertEquals(priorityOrderAsc ? 0 : writeQueueNum / 2 - 1, 
msgList.get(msgList.size() - 1).getPriority());
+        assertEquals(1, msgList.get(msgList.size() - 1).getReconsumeTimes());
+        assertEquals(0, msgList.get(msgList.size() - 1).getQueueOffset()); // 
means a separate retry queue
+    }
+
+    private static Message mockMessage(String topic, int priority, String key) 
{
+        Message msg = new Message(topic, "HW".getBytes());
+        if (priority >= 0) {
+            msg.setPriority(priority);
+        }
+        msg.setKeys(key);
+        return msg;
+    }
+}
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java 
b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
index b9798cfd5a..b2092db96a 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
@@ -63,6 +63,7 @@ public class OffsetResetForPopIT extends BaseConf {
     public void setUp() throws Exception {
         // reset pop offset rely on server side offset
         brokerController1.getBrokerConfig().setUseServerSideResetOffset(true);
+        
brokerController1.getBrokerConfig().setPopConsumerKVServiceEnable(false); // 
force disable before fifo resetOffset issue fixed
 
         adminExt = BaseConf.getAdmin(NAMESRV_ADDR);
         adminExt.start();

Reply via email to