This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d663bcf6a [ISSUE #5292] [RIP-48] Support reset offset in server-side 
to improve the success rate (#5293)
d663bcf6a is described below

commit d663bcf6aa9e171f079e9bd85d65b1d907f4383b
Author: lizhimins <[email protected]>
AuthorDate: Fri Oct 14 14:02:08 2022 +0800

    [ISSUE #5292] [RIP-48] Support reset offset in server-side to improve the 
success rate (#5293)
    
    * [RIP-48] Support reset offset in server side to improve the success rate
    
    * [RIP-48] Support reset offset in server side to improve the success rate
    
    * [RIP-48] Fix code style
    
    * fix unit test
    
    * no need put offset in reset command header
    
    Co-authored-by: 斜阳 <[email protected]>
---
 .../broker/offset/ConsumerOffsetManager.java       |  69 +++++++-
 .../broker/offset/LmqConsumerOffsetManager.java    |   2 +-
 .../broker/processor/AdminBrokerProcessor.java     | 103 ++++++++++++
 .../broker/processor/ConsumerManageProcessor.java  |  57 +++++--
 .../broker/processor/PullMessageProcessor.java     |  32 +++-
 .../subscription/SubscriptionGroupManager.java     |  10 ++
 .../broker/offset/ConsumerOffsetManagerTest.java   |  33 +++-
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  36 +++++
 .../org/apache/rocketmq/common/BrokerConfig.java   |  10 ++
 .../common/protocol/body/ResetOffsetBody.java      |   6 +
 .../protocol/header/ResetOffsetRequestHeader.java  |  24 +++
 .../apache/rocketmq/store/GetMessageStatus.java    |   2 +
 .../rocketmq/test/listener/AbstractListener.java   |  41 ++---
 .../listener/rmq/concurrent/RMQNormalListener.java |   9 +-
 .../org/apache/rocketmq/test/base/BaseConf.java    |  17 +-
 .../apache/rocketmq/test/offset/OffsetResetIT.java | 174 +++++++++++++++++++++
 .../tools/admin/DefaultMQAdminExtImpl.java         |  13 ++
 .../command/offset/ResetOffsetByTimeCommand.java   |  61 ++++----
 .../offset/ResetOffsetByTimeOldCommand.java        |  18 ++-
 19 files changed, 625 insertions(+), 92 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index 02509f60d..5522d232c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.broker.offset;
 
+import com.google.common.base.Strings;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -41,12 +42,15 @@ public class ConsumerOffsetManager extends ConfigManager {
 
     private DataVersion dataVersion = new DataVersion();
 
-    protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, 
Long>> offsetTable =
+    private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, 
Long>> offsetTable =
+        new ConcurrentHashMap<>(512);
+
+    private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> 
resetOffsetTable =
         new ConcurrentHashMap<>(512);
 
     protected transient BrokerController brokerController;
 
-    private transient AtomicLong versionChangeCounter = new AtomicLong(0);
+    private final transient AtomicLong versionChangeCounter = new 
AtomicLong(0);
 
     public ConsumerOffsetManager() {
     }
@@ -204,6 +208,14 @@ public class ConsumerOffsetManager extends ConfigManager {
     public long queryOffset(final String group, final String topic, final int 
queueId) {
         // topic@group
         String key = topic + TOPIC_GROUP_SEPARATOR + group;
+
+        if 
(this.brokerController.getBrokerConfig().isUseServerSideResetOffset()) {
+            Map<Integer, Long> reset = resetOffsetTable.get(key);
+            if (null != reset && reset.containsKey(queueId)) {
+                return reset.get(queueId);
+            }
+        }
+
         ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
         if (null != map) {
             Long offset = map.get(queueId);
@@ -215,6 +227,7 @@ public class ConsumerOffsetManager extends ConfigManager {
         return -1;
     }
 
+    @Override
     public String encode() {
         return this.encode(false);
     }
@@ -229,7 +242,7 @@ public class ConsumerOffsetManager extends ConfigManager {
         if (jsonString != null) {
             ConsumerOffsetManager obj = 
RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
             if (obj != null) {
-                this.offsetTable = obj.offsetTable;
+                this.setOffsetTable(obj.getOffsetTable());
                 this.dataVersion = obj.dataVersion;
             }
         }
@@ -244,7 +257,7 @@ public class ConsumerOffsetManager extends ConfigManager {
         return offsetTable;
     }
 
-    public void setOffsetTable(ConcurrentHashMap<String, 
ConcurrentMap<Integer, Long>> offsetTable) {
+    public void setOffsetTable(ConcurrentMap<String, ConcurrentMap<Integer, 
Long>> offsetTable) {
         this.offsetTable = offsetTable;
     }
 
@@ -318,7 +331,55 @@ public class ConsumerOffsetManager extends ConfigManager {
                 }
             }
         }
+    }
+
+    public void assignResetOffset(String topic, String group, int queueId, 
long offset) {
+        if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || 
queueId < 0 || offset < 0) {
+            LOG.warn("Illegal arguments when assigning reset offset. Topic={}, 
group={}, queueId={}, offset={}",
+                topic, group, queueId, offset);
+            return;
+        }
+
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
+        if (null == map) {
+            map = new ConcurrentHashMap<Integer, Long>();
+            ConcurrentMap<Integer, Long> previous = 
resetOffsetTable.putIfAbsent(key, map);
+            if (null != previous) {
+                map = previous;
+            }
+        }
 
+        map.put(queueId, offset);
+        LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, 
resetOffset={}",
+            topic, group, queueId, offset);
+
+        // Two things are important here:
+        // 1, currentOffsetMap might be null if there is no previous records;
+        // 2, Our overriding here may get overridden by the client instantly 
in concurrent cases; But it still makes
+        // sense in cases like clients are offline.
+        ConcurrentMap<Integer, Long> currentOffsetMap = offsetTable.get(key);
+        if (null != currentOffsetMap) {
+            currentOffsetMap.put(queueId, offset);
+        }
     }
 
+    public boolean hasOffsetReset(String topic, String group, int queueId) {
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
+        if (null == map) {
+            return false;
+        }
+        return map.containsKey(queueId);
+    }
+
+    public Long queryThenEraseResetOffset(String topic, String group, Integer 
queueId) {
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
+        if (null == map) {
+            return null;
+        } else {
+            return map.remove(queueId);
+        }
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
index ec730d38b..ce70b1a82 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
@@ -92,7 +92,7 @@ public class LmqConsumerOffsetManager extends 
ConsumerOffsetManager {
         if (jsonString != null) {
             LmqConsumerOffsetManager obj = 
RemotingSerializable.fromJson(jsonString, LmqConsumerOffsetManager.class);
             if (obj != null) {
-                super.offsetTable = obj.offsetTable;
+                super.setOffsetTable(obj.getOffsetTable());
                 this.lmqOffsetTable = obj.lmqOffsetTable;
             }
         }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index b7ee62c9b..a165add40 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -48,6 +48,7 @@ import org.apache.rocketmq.broker.controller.ReplicasManager;
 import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
+import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
 import 
org.apache.rocketmq.common.protocol.header.GetAllProducerInfoRequestHeader;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
@@ -1633,6 +1634,16 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         LOGGER.info("[reset-offset] reset offset started by {}. topic={}, 
group={}, timestamp={}, isForce={}",
             RemotingHelper.parseChannelRemoteAddr(ctx.channel()), 
requestHeader.getTopic(), requestHeader.getGroup(),
             requestHeader.getTimestamp(), requestHeader.isForce());
+
+        if 
(this.brokerController.getBrokerConfig().isUseServerSideResetOffset()) {
+            String topic = requestHeader.getTopic();
+            String group = requestHeader.getGroup();
+            int queueId = requestHeader.getQueueId();
+            long timestamp = requestHeader.getTimestamp();
+            Long offset = requestHeader.getOffset();
+            return resetOffsetInner(topic, group, queueId, timestamp, offset);
+        }
+
         boolean isC = false;
         LanguageCode language = request.getLanguage();
         switch (language) {
@@ -1644,6 +1655,98 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             requestHeader.getTimestamp(), requestHeader.isForce(), isC);
     }
 
+    private Long searchOffsetByTimestamp(String topic, int queueId, long 
timestamp) {
+        if (timestamp < 0) {
+            return 
brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
+        } else {
+            return 
brokerController.getMessageStore().getOffsetInQueueByTime(topic, queueId, 
timestamp);
+        }
+    }
+
+    /**
+     * Reset consumer offset.
+     *
+     * @param topic         Required, not null.
+     * @param group         Required, not null.
+     * @param queueId       if target queue ID is negative, all message queues 
will be reset;
+     *                      otherwise, only the target queue would get reset.
+     * @param timestamp     if timestamp is negative, offset would be reset to 
broker offset at the time being;
+     *                      otherwise, binary search is performed to locate 
target offset.
+     * @param offset        Target offset to reset to if target queue ID is 
properly provided.
+     * @return Affected queues and their new offset
+     */
+    private RemotingCommand resetOffsetInner(String topic, String group, int 
queueId, long timestamp, Long offset) {
+        RemotingCommand response = 
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
+
+        if (BrokerRole.SLAVE == 
brokerController.getMessageStoreConfig().getBrokerRole()) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("Can not reset offset in slave broker");
+            return response;
+        }
+
+        Map<Integer, Long> queueOffsetMap = new HashMap<>();
+
+        // Reset offset for all queues belonging to the specified topic
+        TopicConfig topicConfig = 
brokerController.getTopicConfigManager().getTopicConfigTable().get(topic);
+        if (null == topicConfig) {
+            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+            response.setRemark("Topic " + topic + " does not exist");
+            LOGGER.warn("Reset offset failed, topic does not exist. topic={}, 
group={}", topic, group);
+            return response;
+        }
+
+        if 
(!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(group))
 {
+            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+            response.setRemark("Group " + group + " does not exist");
+            LOGGER.warn("Reset offset failed, group does not exist. topic={}, 
group={}", topic, group);
+            return response;
+        }
+
+        if (queueId >= 0) {
+            if (null != offset && -1 != offset) {
+                long min = 
brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
+                long max = 
brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
+                if (min >= 0 && offset < min || offset > max + 1) {
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark(
+                        String.format("Target offset %d not in consume queue 
range [%d-%d]", offset, min, max));
+                    return response;
+                }
+            } else {
+                offset = searchOffsetByTimestamp(topic, queueId, timestamp);
+            }
+            queueOffsetMap.put(queueId, offset);
+        } else {
+            for (int index = 0; index < topicConfig.getReadQueueNums(); 
index++) {
+                offset = searchOffsetByTimestamp(topic, index, timestamp);
+                queueOffsetMap.put(index, offset);
+            }
+        }
+
+        if (queueOffsetMap.isEmpty()) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("No queues to reset.");
+            LOGGER.warn("Reset offset aborted: no queues to reset");
+            return response;
+        }
+
+        for (Map.Entry<Integer, Long> entry : queueOffsetMap.entrySet()) {
+            brokerController.getConsumerOffsetManager()
+                .assignResetOffset(topic, group, entry.getKey(), 
entry.getValue());
+        }
+
+        // Prepare reset result.
+        ResetOffsetBody body = new ResetOffsetBody();
+        String brokerName = brokerController.getBrokerConfig().getBrokerName();
+        for (Map.Entry<Integer, Long> entry : queueOffsetMap.entrySet()) {
+            body.getOffsetTable().put(new MessageQueue(topic, brokerName, 
entry.getKey()), entry.getValue());
+        }
+
+        LOGGER.info("Reset offset, topic={}, group={}, queues={}", topic, 
group, body.toJson(false));
+        response.setBody(body.encode());
+        return response;
+    }
+
     public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final GetConsumerStatusRequestHeader requestHeader =
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 0959853cc..a9ceb32df 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -143,27 +144,61 @@ public class ConsumerManageProcessor implements 
NettyRequestProcessor {
 
     private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, 
RemotingCommand request)
         throws RemotingCommandException {
+
         final RemotingCommand response =
             
RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
+
         final UpdateConsumerOffsetRequestHeader requestHeader =
-            (UpdateConsumerOffsetRequestHeader) request
-                
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
-        TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
+            (UpdateConsumerOffsetRequestHeader)
+                
request.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
 
-        RemotingCommand rewriteResult  =  
rewriteRequestForStaticTopic(requestHeader, mappingContext);
+        TopicQueueMappingContext mappingContext =
+            
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
+
+        RemotingCommand rewriteResult = 
rewriteRequestForStaticTopic(requestHeader, mappingContext);
         if (rewriteResult != null) {
             return rewriteResult;
         }
-        if 
(this.brokerController.getTopicConfigManager().containsTopic(requestHeader.getTopic()))
 {
-            
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
 requestHeader.getConsumerGroup(),
-                requestHeader.getTopic(), requestHeader.getQueueId(), 
requestHeader.getCommitOffset());
-            response.setCode(ResponseCode.SUCCESS);
-            response.setRemark(null);
-        } else {
+
+        String topic = requestHeader.getTopic();
+        String group = requestHeader.getConsumerGroup();
+        Integer queueId = requestHeader.getQueueId();
+        Long offset = requestHeader.getCommitOffset();
+
+        if 
(!this.brokerController.getTopicConfigManager().containsTopic(requestHeader.getTopic()))
 {
             response.setCode(ResponseCode.TOPIC_NOT_EXIST);
-            response.setRemark("Topic " + requestHeader.getTopic() + " not 
exist!");
+            response.setRemark("Topic " + topic + " not exist!");
+            return response;
+        }
+
+        if (queueId == null) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("QueueId is null, topic is " + topic);
+            return response;
+        }
+
+        if (offset == null) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("Offset is null, topic is " + topic);
+            return response;
+        }
+
+        ConsumerOffsetManager consumerOffsetManager = 
brokerController.getConsumerOffsetManager();
+        if 
(this.brokerController.getBrokerConfig().isUseServerSideResetOffset()) {
+            // Note, ignoring this update offset request
+            if (consumerOffsetManager.hasOffsetReset(topic, group, queueId)) {
+                response.setCode(ResponseCode.SUCCESS);
+                response.setRemark("Offset has been previously reset");
+                LOGGER.info("Update consumer offset is rejected because of 
previous offset-reset. Group={}, " +
+                    "Topic={}, QueueId={}, Offset={}", group, topic, queueId, 
offset);
+                return response;
+            }
         }
 
+        this.brokerController.getConsumerOffsetManager().commitOffset(
+            RemotingHelper.parseChannelRemoteAddr(ctx.channel()), group, 
topic, queueId, offset);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
         return response;
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 989bc3124..700ce55d7 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -66,6 +66,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
@@ -303,7 +304,8 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
 
         if 
(!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission()))
 {
             response.setCode(ResponseCode.NO_PERMISSION);
-            response.setRemark(String.format("the broker[%s] pulling message 
is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
+            response.setRemark(String.format("the broker[%s] pulling message 
is forbidden",
+                    this.brokerController.getBrokerConfig().getBrokerIP1()));
             return response;
         }
 
@@ -462,9 +464,26 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
                 this.brokerController.getConsumerFilterManager());
         }
 
-        final GetMessageResult getMessageResult =
-            
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(),
 requestHeader.getTopic(),
-                requestHeader.getQueueId(), requestHeader.getQueueOffset(), 
requestHeader.getMaxMsgNums(), messageFilter);
+        final MessageStore messageStore = brokerController.getMessageStore();
+        final boolean useResetOffsetFeature = 
brokerController.getBrokerConfig().isUseServerSideResetOffset();
+        String topic = requestHeader.getTopic();
+        String group = requestHeader.getConsumerGroup();
+        int queueId = requestHeader.getQueueId();
+        Long resetOffset = 
brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topic, 
group, queueId);
+
+        GetMessageResult getMessageResult;
+        if (useResetOffsetFeature && null != resetOffset) {
+            getMessageResult = new GetMessageResult();
+            getMessageResult.setStatus(GetMessageStatus.OFFSET_RESET);
+            getMessageResult.setNextBeginOffset(resetOffset);
+            
getMessageResult.setMinOffset(messageStore.getMinOffsetInQueue(topic, queueId));
+            
getMessageResult.setMaxOffset(messageStore.getMaxOffsetInQueue(topic, queueId));
+            getMessageResult.setSuggestPullingFromSlave(false);
+        } else {
+            getMessageResult = messageStore.getMessage(
+                group, topic, queueId, requestHeader.getQueueOffset(), 
requestHeader.getMaxMsgNums(), messageFilter);
+        }
+
         if (getMessageResult != null) {
             response.setRemark(getMessageResult.getStatus().name());
             
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
@@ -512,6 +531,11 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
                 case OFFSET_OVERFLOW_ONE:
                     response.setCode(ResponseCode.PULL_NOT_FOUND);
                     break;
+                case OFFSET_RESET:
+                    response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+                    LOGGER.info("The queue under pulling was previously reset 
to start from {}",
+                        getMessageResult.getNextBeginOffset());
+                    break;
                 case OFFSET_TOO_SMALL:
                     response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                     LOGGER.info("the request offset too small. group={}, 
topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index b43579fae..e9aaba388 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -20,6 +20,7 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
@@ -251,6 +252,7 @@ public class SubscriptionGroupManager extends ConfigManager 
{
         }
     }
 
+    @Override
     public String encode(final boolean prettyFormat) {
         return RemotingSerializable.toJson(this, prettyFormat);
     }
@@ -294,4 +296,12 @@ public class SubscriptionGroupManager extends 
ConfigManager {
             this.subscriptionGroupTable.put(key, 
otherSubscriptionGroupTable.get(key));
         }
     }
+
+    public boolean containsSubscriptionGroup(String group) {
+        if (StringUtils.isBlank(group)) {
+            return false;
+        }
+
+        return subscriptionGroupTable.containsKey(group);
+    }
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
index 4374164da..7bd289a6f 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
@@ -17,22 +17,34 @@
 
 package org.apache.rocketmq.broker.offset;
 
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import org.mockito.Mockito;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class ConsumerOffsetManagerTest {
 
+    private static final String KEY = "FooBar@FooBarGroup";
+
+    private BrokerController brokerController;
+
     private ConsumerOffsetManager consumerOffsetManager;
 
-    private static final String KEY = "FooBar@FooBarGroup";
     @Before
     public void init() {
-        consumerOffsetManager = new ConsumerOffsetManager();
+        brokerController = Mockito.mock(BrokerController.class);
+        consumerOffsetManager = new ConsumerOffsetManager(brokerController);
+
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        
Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+
         ConcurrentHashMap<String, ConcurrentMap<Integer, Long>> offsetTable = 
new ConcurrentHashMap<>(512);
         offsetTable.put(KEY,new ConcurrentHashMap<Integer, Long>() {{
                 put(1,2L);
@@ -52,4 +64,21 @@ public class ConsumerOffsetManagerTest {
         consumerOffsetManager.cleanOffsetByTopic("FooBar");
         
assertThat(!consumerOffsetManager.getOffsetTable().containsKey(KEY)).isTrue();
     }
+
+    @Test
+    public void testOffsetPersistInMemory() {
+        ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable = 
consumerOffsetManager.getOffsetTable();
+        ConcurrentMap<Integer, Long> table = new ConcurrentHashMap<>();
+        table.put(0, 1L);
+        table.put(1, 3L);
+        String group = "G1";
+        offsetTable.put(group, table);
+
+        consumerOffsetManager.persist();
+        ConsumerOffsetManager manager = new 
ConsumerOffsetManager(brokerController);
+        manager.load();
+
+        ConcurrentMap<Integer, Long> offsetTableLoaded = 
manager.getOffsetTable().get(group);
+        Assert.assertEquals(table, offsetTableLoaded);
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 145a44d63..b327ee28b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -2052,6 +2052,40 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
         return invokeBrokerToResetOffset(addr, topic, group, timestamp, 
isForce, timeoutMillis, false);
     }
 
+    public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String 
addr, final String topic, final String group,
+        final long timestamp, int queueId, Long offset, final long 
timeoutMillis)
+        throws RemotingException, MQClientException, InterruptedException {
+
+        ResetOffsetRequestHeader requestHeader = new 
ResetOffsetRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setGroup(group);
+        requestHeader.setQueueId(queueId);
+        requestHeader.setTimestamp(timestamp);
+        requestHeader.setOffset(offset);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET,
+            requestHeader);
+
+        RemotingCommand response = remotingClient.invokeSync(
+            MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
addr), request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                if (null != response.getBody()) {
+                    return ResetOffsetBody.decode(response.getBody(), 
ResetOffsetBody.class).getOffsetTable();
+                }
+                break;
+            }
+            case ResponseCode.TOPIC_NOT_EXIST:
+            case ResponseCode.SUBSCRIPTION_NOT_EXIST:
+            case ResponseCode.SYSTEM_ERROR:
+                log.warn("Invoke broker to reset offset error code={}, 
remark={}",
+                    response.getCode(), response.getRemark());
+                break;
+            default:
+                break;
+        }
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
     public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String 
addr, final String topic, final String group,
         final long timestamp, final boolean isForce, final long timeoutMillis, 
boolean isC)
         throws RemotingException, MQClientException, InterruptedException {
@@ -2060,6 +2094,8 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
         requestHeader.setGroup(group);
         requestHeader.setTimestamp(timestamp);
         requestHeader.setForce(isForce);
+        // offset is -1 means offset is null
+        requestHeader.setOffset(-1L);
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, 
requestHeader);
         if (isC) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index fc49428bb..854ef6334 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -291,6 +291,8 @@ public class BrokerConfig extends BrokerIdentity {
 
     private boolean asyncSendEnable = true;
 
+    private boolean useServerSideResetOffset = false;
+
     private long consumerOffsetUpdateVersionStep = 500;
 
     private long delayOffsetUpdateVersionStep = 200;
@@ -1356,4 +1358,12 @@ public class BrokerConfig extends BrokerIdentity {
     public void setFetchNameSrvAddrByDnsLookup(boolean 
fetchNameSrvAddrByDnsLookup) {
         this.fetchNameSrvAddrByDnsLookup = fetchNameSrvAddrByDnsLookup;
     }
+
+    public boolean isUseServerSideResetOffset() {
+        return useServerSideResetOffset;
+    }
+
+    public void setUseServerSideResetOffset(boolean useServerSideResetOffset) {
+        this.useServerSideResetOffset = useServerSideResetOffset;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
index b28e74b56..d2a97f893 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
@@ -17,13 +17,19 @@
 
 package org.apache.rocketmq.common.protocol.body;
 
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class ResetOffsetBody extends RemotingSerializable {
+
     private Map<MessageQueue, Long> offsetTable;
 
+    public ResetOffsetBody() {
+        offsetTable = new HashMap<>();
+    }
+
     public Map<MessageQueue, Long> getOffsetTable() {
         return offsetTable;
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResetOffsetRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResetOffsetRequestHeader.java
index c3bfa2189..78be60a76 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResetOffsetRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResetOffsetRequestHeader.java
@@ -22,12 +22,20 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class ResetOffsetRequestHeader implements CommandCustomHeader {
+
     @CFNotNull
     private String topic;
+
     @CFNotNull
     private String group;
+
+    private int queueId = -1;
+
+    private Long offset;
+
     @CFNotNull
     private long timestamp;
+
     @CFNotNull
     private boolean isForce;
 
@@ -63,6 +71,22 @@ public class ResetOffsetRequestHeader implements 
CommandCustomHeader {
         this.isForce = isForce;
     }
 
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+    public Long getOffset() {
+        return offset;
+    }
+
+    public void setOffset(Long offset) {
+        this.offset = offset;
+    }
+
     @Override
     public void checkFields() throws RemotingCommandException {
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java 
b/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
index 6a824b898..bc244865f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
+++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
@@ -35,4 +35,6 @@ public enum GetMessageStatus {
     NO_MATCHED_LOGIC_QUEUE,
 
     NO_MESSAGE_IN_QUEUE,
+
+    OFFSET_RESET
 }
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java 
b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
index c2a3891dd..10eedd1b9 100644
--- a/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
+++ b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.test.listener;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.log4j.Logger;
@@ -62,54 +61,40 @@ public class AbstractListener extends MQCollector 
implements MessageListener {
         super.lockCollectors();
     }
 
-    public Collection<Object> waitForMessageConsume(Collection<Object> 
allSendMsgs,
-        int timeoutMills) {
-        this.allSendMsgs = allSendMsgs;
-        List<Object> sendMsgs = new ArrayList<Object>();
-        sendMsgs.addAll(allSendMsgs);
+    public Collection<Object> waitForMessageConsume(Collection<Object> 
allSendMessages, int timeoutMills) {
+        this.allSendMsgs = allSendMessages;
+        List<Object> sendMessages = new ArrayList<>(allSendMessages);
 
         long curTime = System.currentTimeMillis();
-        while (!sendMsgs.isEmpty()) {
-            Iterator<Object> iter = sendMsgs.iterator();
-            while (iter.hasNext()) {
-                Object msg = iter.next();
-                if (msgBodys.getAllData().contains(msg)) {
-                    iter.remove();
-                }
-            }
-            if (sendMsgs.isEmpty()) {
+        while (!sendMessages.isEmpty()) {
+            sendMessages.removeIf(msg -> msgBodys.getAllData().contains(msg));
+            if (sendMessages.isEmpty()) {
                 break;
             } else {
                 if (System.currentTimeMillis() - curTime >= timeoutMills) {
-                    LOGGER.error(String.format("timeout but  [%s]  not recv 
all send messages!",
-                        listenerName));
+                    LOGGER.error(String.format("timeout but [%s] not recv all 
send messages!", listenerName));
                     break;
                 } else {
-                    LOGGER.info(String.format("[%s] still [%s] msg not recv!", 
listenerName,
-                        sendMsgs.size()));
+                    LOGGER.info(String.format("[%s] still [%s] msg not recv!", 
listenerName, sendMessages.size()));
                     TestUtil.waitForMonment(500);
                 }
             }
         }
-
-        return sendMsgs;
+        return sendMessages;
     }
 
-    public long waitForMessageConsume(int size,
-        int timeoutMills) {
-
+    public long waitForMessageConsume(int size, int timeoutMills) {
         long curTime = System.currentTimeMillis();
         while (true) {
             if (msgBodys.getDataSize() >= size) {
                 break;
             }
             if (System.currentTimeMillis() - curTime >= timeoutMills) {
-                LOGGER.error(String.format("timeout but  [%s]  not recv all 
send messages!",
-                    listenerName));
+                LOGGER.error(String.format("timeout but  [%s]  not recv all 
send messages!", listenerName));
                 break;
             } else {
-                LOGGER.info(String.format("[%s] still [%s] msg not recv!", 
listenerName,
-                    size - msgBodys.getDataSize()));
+                LOGGER.info(String.format("[%s] still [%s] msg not recv!",
+                    listenerName, size - msgBodys.getDataSize()));
                 TestUtil.waitForMonment(500);
             }
         }
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListener.java
 
b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListener.java
index 908aed1be..678216810 100644
--- 
a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListener.java
+++ 
b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListener.java
@@ -27,8 +27,10 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.test.listener.AbstractListener;
 
 public class RMQNormalListener extends AbstractListener implements 
MessageListenerConcurrently {
+
     private ConsumeConcurrentlyStatus consumeStatus = 
ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-    private AtomicInteger msgIndex = new AtomicInteger(0);
+
+    private final AtomicInteger msgIndex = new AtomicInteger(0);
 
     public RMQNormalListener() {
         super();
@@ -47,6 +49,11 @@ public class RMQNormalListener extends AbstractListener 
implements MessageListen
         super(originMsgCollector, msgBodyCollector);
     }
 
+    public AtomicInteger getMsgIndex() {
+        return msgIndex;
+    }
+
+    @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
         ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         for (MessageExt msg : msgs) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java 
b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 818bdbd65..035a8be68 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -57,25 +57,31 @@ import org.slf4j.LoggerFactory;
 import static org.awaitility.Awaitility.await;
 
 public class BaseConf {
+
+    private final static Logger log = LoggerFactory.getLogger(BaseConf.class);
+
     public final static String NAMESRV_ADDR;
+
+    //the logic queue test need at least three brokers
+    protected final static String CLUSTER_NAME;
     protected final static String BROKER1_NAME;
     protected final static String BROKER2_NAME;
-    //the logic queue test need at least three brokers
     protected final static String BROKER3_NAME;
-    protected final static String CLUSTER_NAME;
+
     protected final static int BROKER_NUM = 3;
     protected final static int WAIT_TIME = 5;
     protected final static int CONSUME_TIME = 2 * 60 * 1000;
     protected final static int QUEUE_NUMBERS = 8;
+
     protected static NamesrvController namesrvController;
     protected static BrokerController brokerController1;
     protected static BrokerController brokerController2;
     protected static BrokerController brokerController3;
     protected static List<BrokerController> brokerControllerList;
     protected static Map<String, BrokerController> brokerControllerMap;
+
     protected static List<Object> mqClients = new ArrayList<Object>();
     protected static boolean debug = false;
-    private final static Logger log = LoggerFactory.getLogger(BaseConf.class);
 
     static {
         System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, 
Integer.toString(MQVersion.CURRENT_VERSION));
@@ -100,7 +106,8 @@ public class BaseConf {
         BROKER2_NAME = brokerController2.getBrokerConfig().getBrokerName();
         BROKER3_NAME = brokerController3.getBrokerConfig().getBrokerName();
         brokerControllerList = ImmutableList.of(brokerController1, 
brokerController2, brokerController3);
-        brokerControllerMap = 
brokerControllerList.stream().collect(Collectors.toMap(input -> 
input.getBrokerConfig().getBrokerName(), Function.identity()));
+        brokerControllerMap = brokerControllerList.stream().collect(
+            Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(), 
Function.identity()));
     }
 
     public BaseConf() {
@@ -203,7 +210,7 @@ public class BaseConf {
     }
 
     public static DefaultMQAdminExt getAdmin(String nsAddr) {
-        final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500);
+        final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(3 * 1000);
         mqAdminExt.setNamesrvAddr(nsAddr);
         mqAdminExt.setPollNameServerInterval(100);
         mqClients.add(mqAdminExt);
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetIT.java 
b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetIT.java
new file mode 100644
index 000000000..edf7b4d0d
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.offset;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
+import org.apache.rocketmq.test.message.MessageQueueMsg;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import static org.awaitility.Awaitility.await;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class OffsetResetIT extends BaseConf {
+
+    private static final Logger LOGGER = Logger.getLogger(OffsetResetIT.class);
+
+    private RMQNormalListener listener = null;
+    private RMQNormalProducer producer = null;
+    private RMQNormalConsumer consumer = null;
+    private DefaultMQAdminExt defaultMQAdminExt = null;
+    private String topic = null;
+
+    @Before
+    public void init() throws MQClientException {
+        topic = initTopic();
+        LOGGER.info(String.format("use topic: %s;", topic));
+
+        for (BrokerController controller : brokerControllerList) {
+            controller.getBrokerConfig().setLongPollingEnable(false);
+            controller.getBrokerConfig().setShortPollingTimeMills(500);
+            controller.getBrokerConfig().setUseServerSideResetOffset(true);
+        }
+
+        listener = new RMQNormalListener();
+        producer = getProducer(NAMESRV_ADDR, topic);
+        consumer = getConsumer(NAMESRV_ADDR, topic, "*", listener);
+
+        defaultMQAdminExt = BaseConf.getAdmin(NAMESRV_ADDR);
+        defaultMQAdminExt.start();
+    }
+
+    @After
+    public void tearDown() {
+        shutdown();
+    }
+
+    @Test
+    public void testEncodeOffsetHeader() {
+        ResetOffsetRequestHeader requestHeader = new 
ResetOffsetRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setGroup(consumer.getConsumerGroup());
+        requestHeader.setTimestamp(System.currentTimeMillis());
+        requestHeader.setForce(false);
+        
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, 
requestHeader);
+    }
+
+    /**
+     * use mq admin tool to query remote offset
+     */
+    private long getConsumerLag(String topic, String group) throws Exception {
+        long consumerLag = 0L;
+        for (BrokerController controller : brokerControllerList) {
+            ConsumeStats consumeStats = 
defaultMQAdminExt.getDefaultMQAdminExtImpl()
+                .getMqClientInstance().getMQClientAPIImpl()
+                .getConsumeStats(controller.getBrokerAddr(), group, topic, 
3000);
+            Map<MessageQueue, OffsetWrapper> offsetTable = 
consumeStats.getOffsetTable();
+
+            for (Map.Entry<MessageQueue, OffsetWrapper> entry : 
offsetTable.entrySet()) {
+                MessageQueue messageQueue = entry.getKey();
+                OffsetWrapper offsetWrapper = entry.getValue();
+
+                Assert.assertEquals(messageQueue.getBrokerName(), 
controller.getBrokerConfig().getBrokerName());
+                long brokerOffset = 
controller.getMessageStore().getMaxOffsetInQueue(topic, 
messageQueue.getQueueId());
+                long consumerOffset = 
controller.getConsumerOffsetManager().queryOffset(
+                    consumer.getConsumerGroup(), topic, 
messageQueue.getQueueId());
+                Assert.assertEquals(brokerOffset, 
offsetWrapper.getBrokerOffset());
+                Assert.assertEquals(consumerOffset, 
offsetWrapper.getConsumerOffset());
+
+                consumerLag += brokerOffset - consumerOffset;
+            }
+        }
+        return consumerLag;
+    }
+
+    @Test
+    public void testResetOffsetSingleQueue() throws Exception {
+        int msgSize = 100;
+        List<MessageQueue> mqs = producer.getMessageQueue();
+        MessageQueueMsg messageQueueMsg = new MessageQueueMsg(mqs, msgSize);
+
+        producer.send(messageQueueMsg.getMsgsWithMQ());
+        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 
CONSUME_TIME);
+
+        
await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofMinutes(3)).until(
+            () -> 0L == this.getConsumerLag(topic, 
consumer.getConsumerGroup()));
+
+        for (BrokerController controller : brokerControllerList) {
+            defaultMQAdminExt.resetOffsetByQueueId(controller.getBrokerAddr(),
+                consumer.getConsumerGroup(), consumer.getTopic(), 3, 0);
+        }
+
+        int hasConsumeBefore = listener.getMsgIndex().get();
+        int expectAfterReset = brokerControllerList.size() * msgSize;
+        
await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofMinutes(3)).until(()
 -> {
+            long receive = listener.getMsgIndex().get();
+            long expect = hasConsumeBefore + expectAfterReset;
+            return receive >= expect;
+        });
+    }
+
+    @Test
+    public void testResetOffsetTotal() throws Exception {
+        int msgSize = 100;
+        long start = System.currentTimeMillis();
+        List<MessageQueue> mqs = producer.getMessageQueue();
+        MessageQueueMsg messageQueueMsg = new MessageQueueMsg(mqs, msgSize);
+
+        producer.send(messageQueueMsg.getMsgsWithMQ());
+        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 
CONSUME_TIME);
+
+        
await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofMinutes(3)).until(
+            () -> 0L == this.getConsumerLag(topic, 
consumer.getConsumerGroup()));
+
+        for (BrokerController controller : brokerControllerList) {
+            
defaultMQAdminExt.getDefaultMQAdminExtImpl().getMqClientInstance().getMQClientAPIImpl()
+                .invokeBrokerToResetOffset(controller.getBrokerAddr(),
+                    consumer.getTopic(), consumer.getConsumerGroup(), start, 
true, 3 * 1000);
+        }
+
+        int hasConsumeBefore = listener.getMsgIndex().get();
+        int expectAfterReset = mqs.size() * msgSize;
+        
await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofMinutes(3)).until(()
 -> {
+            long receive = listener.getMsgIndex().get();
+            long expect = hasConsumeBefore + expectAfterReset;
+            return receive >= expect;
+        });
+    }
+}
\ No newline at end of file
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 06503962e..9f0fd4043 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.tools.admin;
 
+import com.alibaba.fastjson.JSON;
 import java.io.UnsupportedEncodingException;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -1780,6 +1781,18 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
         requestHeader.setQueueId(queueId);
         requestHeader.setCommitOffset(resetOffset);
         
this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, 
requestHeader, timeoutMillis);
+        try {
+            Map<MessageQueue, Long> result = 
mqClientInstance.getMQClientAPIImpl()
+                .invokeBrokerToResetOffset(brokerAddr, topicName, 
consumeGroup, 0, queueId, resetOffset, timeoutMillis);
+            if (null != result) {
+                for (Map.Entry<MessageQueue, Long> entry : result.entrySet()) {
+                    log.info("Reset single message queue {} offset from {} to 
{}",
+                        JSON.toJSONString(entry.getKey()), entry.getValue(), 
resetOffset);
+                }
+            }
+        } catch (MQClientException e) {
+            throw new MQBrokerException(e.getResponseCode(), e.getMessage());
+        }
     }
 
     @Override
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
index f95c7e514..9c7b7ad9d 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
@@ -17,8 +17,8 @@
 
 package org.apache.rocketmq.tools.command.offset;
 
-import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
@@ -57,11 +57,11 @@ public class ResetOffsetByTimeCommand implements SubCommand 
{
         opt.setRequired(true);
         options.addOption(opt);
 
-        opt = new Option("f", "force", true, "set the force rollback by 
timestamp switch[true|false]");
+        opt = new Option("f", "force", true, "set the force rollback by 
timestamp switch[true|false]. Deprecated.");
         opt.setRequired(false);
         options.addOption(opt);
 
-        opt = new Option("c", "cplus", false, "reset c++ client offset");
+        opt = new Option("c", "cplus", false, "reset c++ client offset. 
Deprecated.");
         opt.setRequired(false);
         options.addOption(opt);
 
@@ -73,6 +73,10 @@ public class ResetOffsetByTimeCommand implements SubCommand {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("o", "offset", true, "Expect queue offset, not 
support old version broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -84,26 +88,23 @@ public class ResetOffsetByTimeCommand implements SubCommand 
{
             String group = commandLine.getOptionValue("g").trim();
             String topic = commandLine.getOptionValue("t").trim();
             String timeStampStr = commandLine.getOptionValue("s").trim();
-            long timestamp = timeStampStr.equals("now") ? -1 : 0;
+            long timestamp = "now".equals(timeStampStr) ? 
System.currentTimeMillis() : 0;
 
             try {
                 if (timestamp == 0) {
                     timestamp = Long.parseLong(timeStampStr);
                 }
             } catch (NumberFormatException e) {
-
-                timestamp = UtilAll.parseDate(timeStampStr, 
UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime();
+                timestamp = Objects.requireNonNull(
+                    UtilAll.parseDate(timeStampStr, 
UtilAll.YYYY_MM_DD_HH_MM_SS_SSS)).getTime();
             }
 
             boolean force = true;
             if (commandLine.hasOption('f')) {
-                force = 
Boolean.valueOf(commandLine.getOptionValue("f").trim());
+                force = 
Boolean.parseBoolean(commandLine.getOptionValue("f").trim());
             }
 
-            boolean isC = false;
-            if (commandLine.hasOption('c')) {
-                isC = true;
-            }
+            boolean isC = commandLine.hasOption('c');
 
             String brokerAddr = null;
             if (commandLine.hasOption('b')) {
@@ -118,19 +119,24 @@ public class ResetOffsetByTimeCommand implements 
SubCommand {
                 
defaultMQAdminExt.setNamesrvAddr(commandLine.getOptionValue('n').trim());
             }
 
+            Long offset = null;
+            if (commandLine.hasOption('o')) {
+                offset = Long.parseLong(commandLine.getOptionValue('o'));
+            }
+
             defaultMQAdminExt.start();
 
-            if (brokerAddr != null && queueId > -1) {
-                System.out.printf("rollback consumer offset by specified 
group[%s], topic[%s], queueId[%s], broker[%s], timestamp(string)[%s], 
timestamp(long)[%s]%n",
+            if (brokerAddr != null && queueId >= 0) {
+                System.out.printf("start reset consumer offset by specified, " 
+
+                        "group[%s], topic[%s], queueId[%s], broker[%s], 
timestamp(string)[%s], timestamp(long)[%s]%n",
                         group, topic, queueId, brokerAddr, timeStampStr, 
timestamp);
-                try {
-                    long resetOffset = 
defaultMQAdminExt.searchOffset(brokerAddr, topic, queueId, timestamp, 3000);
-                    System.out.printf("Rollback Offset is: %s", resetOffset);
-                    if (resetOffset > 0) {
-                        defaultMQAdminExt.resetOffsetByQueueId(brokerAddr, 
group, topic, queueId, resetOffset);
-                    }
-                } catch (Throwable e) {
-                    throw e;
+
+                long resetOffset = null != offset ? offset :
+                    defaultMQAdminExt.searchOffset(brokerAddr, topic, queueId, 
timestamp, 3000);
+
+                System.out.printf("reset consumer offset to %d%n", 
resetOffset);
+                if (resetOffset > 0) {
+                    defaultMQAdminExt.resetOffsetByQueueId(brokerAddr, group, 
topic, queueId, resetOffset);
                 }
                 return;
             }
@@ -139,6 +145,7 @@ public class ResetOffsetByTimeCommand implements SubCommand 
{
             try {
                 offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, 
group, timestamp, force, isC);
             } catch (MQClientException e) {
+                // if consumer not online, use old command to reset reset
                 if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
                     ResetOffsetByTimeOldCommand.resetOffset(defaultMQAdminExt, 
group, topic, timestamp, force, timeStampStr);
                     return;
@@ -146,17 +153,13 @@ public class ResetOffsetByTimeCommand implements 
SubCommand {
                 throw e;
             }
 
-            System.out.printf("rollback consumer offset by specified 
group[%s], topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n",
+            System.out.printf("start reset consumer offset by specified, " +
+                    "group[%s], topic[%s], force[%s], timestamp(string)[%s], 
timestamp(long)[%s]%n",
                 group, topic, force, timeStampStr, timestamp);
 
-            System.out.printf("%-40s  %-40s  %-40s%n",
-                "#brokerName",
-                "#queueId",
-                "#offset");
+            System.out.printf("%-40s  %-40s  %-40s%n", "#brokerName", 
"#queueId", "#offset");
 
-            Iterator<Map.Entry<MessageQueue, Long>> iterator = 
offsetTable.entrySet().iterator();
-            while (iterator.hasNext()) {
-                Map.Entry<MessageQueue, Long> entry = iterator.next();
+            for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) 
{
                 System.out.printf("%-40s  %-40d  %-40d%n",
                     UtilAll.frontStringAtLeast(entry.getKey().getBrokerName(), 
32),
                     entry.getKey().getQueueId(),
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
index 0c02d8f79..d86c3cf71 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
@@ -33,12 +33,16 @@ import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 
 public class ResetOffsetByTimeOldCommand implements SubCommand {
+
     public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String 
consumerGroup, String topic,
-        long timestamp, boolean force,
-        String timeStampStr) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
-        List<RollbackStats> rollbackStatsList = 
defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, 
force);
-        System.out.printf(
-            "rollback consumer offset by specified consumerGroup[%s], 
topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n",
+        long timestamp, boolean force, String timeStampStr)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+
+        List<RollbackStats> rollbackStatsList =
+            defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic, 
timestamp, force);
+
+        System.out.printf("reset consumer offset by specified " +
+                "consumerGroup[%s], topic[%s], force[%s], 
timestamp(string)[%s], timestamp(long)[%s]%n",
             consumerGroup, topic, force, timeStampStr, timestamp);
 
         System.out.printf("%-20s  %-20s  %-20s  %-20s  %-20s  %-20s%n",
@@ -47,7 +51,7 @@ public class ResetOffsetByTimeOldCommand implements 
SubCommand {
             "#brokerOffset",
             "#consumerOffset",
             "#timestampOffset",
-            "#rollbackOffset"
+            "#resetOffset"
         );
 
         for (RollbackStats rollbackStats : rollbackStatsList) {
@@ -115,7 +119,7 @@ public class ResetOffsetByTimeOldCommand implements 
SubCommand {
 
                 boolean force = true;
                 if (commandLine.hasOption('f')) {
-                    force = 
Boolean.valueOf(commandLine.getOptionValue("f").trim());
+                    force = 
Boolean.parseBoolean(commandLine.getOptionValue("f").trim());
                 }
 
                 defaultMQAdminExt.start();

Reply via email to