This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e5e38396ba [ISSUE #8591] Preliminary support for key commands of LMQ
(#8590)
e5e38396ba is described below
commit e5e38396ba32293b3bd40a5a40ff402d42dce928
Author: rongtong <[email protected]>
AuthorDate: Fri Aug 30 14:15:03 2024 +0800
[ISSUE #8591] Preliminary support for key commands of LMQ (#8590)
* Preliminary support for key commands of LMQ
* Preliminary support for key commands of LMQ
* Optimize some code
* Fix some bugs and UTs for lmq support
* Fix UTs can not pass
* Fix UTs can not pass
* Add some check to prevent NPE
---
.../broker/processor/AdminBrokerProcessor.java | 2 +-
.../apache/rocketmq/client/impl/MQAdminImpl.java | 47 ++++--
.../rocketmq/client/impl/MQAdminImplTest.java | 2 +-
.../example/{simple => lmq}/LMQProducer.java | 3 +-
.../example/{simple => lmq}/LMQPullConsumer.java | 2 +-
.../example/{simple => lmq}/LMQPushConsumer.java | 2 +-
.../{simple => lmq}/LMQPushPopConsumer.java | 2 +-
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 85 ++++++++---
.../tools/admin/DefaultMQAdminExtImpl.java | 157 +++++++++++++++------
.../apache/rocketmq/tools/admin/MQAdminExt.java | 12 ++
.../consumer/ConsumerProgressSubCommand.java | 8 +-
.../command/message/QueryMsgByIdSubCommand.java | 29 ++--
.../command/message/QueryMsgByKeySubCommand.java | 25 +++-
.../message/QueryMsgByUniqueKeySubCommand.java | 28 ++--
.../command/offset/ResetOffsetByTimeCommand.java | 13 +-
.../offset/ResetOffsetByTimeOldCommand.java | 13 +-
.../command/offset/SkipAccumulationSubCommand.java | 7 +-
.../tools/command/topic/TopicStatusSubCommand.java | 24 +++-
.../message/QueryMsgByUniqueKeySubCommandTest.java | 12 +-
19 files changed, 348 insertions(+), 125 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 3039cf5c97..28bd254914 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -2062,7 +2062,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
Map<Integer, Long> queueOffsetMap = new HashMap<>();
// Reset offset for all queues belonging to the specified topic
- TopicConfig topicConfig =
brokerController.getTopicConfigManager().getTopicConfigTable().get(topic);
+ TopicConfig topicConfig =
brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("Topic " + topic + " does not exist");
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index bcfe29bd4f..c1e3ee33dc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -43,6 +44,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -199,7 +201,7 @@ public class MQAdminImpl {
if (brokerAddr != null) {
try {
return
this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq,
timestamp,
- boundaryType, timeoutMillis);
+ boundaryType, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "]
exception", e);
}
@@ -277,13 +279,20 @@ public class MQAdminImpl {
public QueryResult queryMessage(String topic, String key, int maxNum, long
begin,
long end) throws MQClientException,
InterruptedException {
- return queryMessage(topic, key, maxNum, begin, end, false);
+ return queryMessage(null, topic, key, maxNum, begin, end, false);
}
public QueryResult queryMessageByUniqKey(String topic, String uniqKey, int
maxNum, long begin, long end)
throws MQClientException, InterruptedException {
- return queryMessage(topic, uniqKey, maxNum, begin, end, true);
+ return queryMessage(null, topic, uniqKey, maxNum, begin, end, true);
+ }
+
+ public QueryResult queryMessageByUniqKey(String clusterName, String topic,
String uniqKey, int maxNum, long begin,
+ long end)
+ throws MQClientException, InterruptedException {
+
+ return queryMessage(clusterName, topic, uniqKey, maxNum, begin, end,
true);
}
public MessageExt queryMessageByUniqKey(String topic,
@@ -311,25 +320,29 @@ public class MQAdminImpl {
}
}
- protected QueryResult queryMessage(String topic, String key, int maxNum,
long begin, long end,
+ public QueryResult queryMessage(String clusterName, String topic, String
key, int maxNum, long begin, long end,
boolean isUniqKey) throws MQClientException,
InterruptedException {
- return queryMessage(null, topic, key, maxNum, begin, end, isUniqKey);
- }
+ boolean isLmq = MixAll.isLmq(topic);
+
+ String routeTopic = topic;
+ // if topic is lmq ,then use clusterName as lmq parent topic
+ // Use clusterName or lmq parent topic to get topic route for lmq or
rmq_sys_wheel_timer
+ if (!StringUtils.isEmpty(topic) && (isLmq ||
topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer"))
+ && !StringUtils.isEmpty(clusterName)) {
+ routeTopic = clusterName;
+ }
- protected QueryResult queryMessage(String clusterName, String topic,
String key, int maxNum, long begin, long end,
- boolean isUniqKey) throws MQClientException,
- InterruptedException {
- TopicRouteData topicRouteData =
this.mQClientFactory.getAnExistTopicRouteData(topic);
+ TopicRouteData topicRouteData =
this.mQClientFactory.getAnExistTopicRouteData(routeTopic);
if (null == topicRouteData) {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- topicRouteData =
this.mQClientFactory.getAnExistTopicRouteData(topic);
+
this.mQClientFactory.updateTopicRouteInfoFromNameServer(routeTopic);
+ topicRouteData =
this.mQClientFactory.getAnExistTopicRouteData(routeTopic);
}
if (topicRouteData != null) {
List<String> brokerAddrs = new LinkedList<>();
for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
- if (clusterName != null && !clusterName.isEmpty()
+ if (!isLmq && clusterName != null && !clusterName.isEmpty()
&& !clusterName.equals(brokerData.getCluster())) {
continue;
}
@@ -347,7 +360,11 @@ public class MQAdminImpl {
for (String addr : brokerAddrs) {
try {
QueryMessageRequestHeader requestHeader = new
QueryMessageRequestHeader();
- requestHeader.setTopic(topic);
+ if (isLmq) {
+ requestHeader.setTopic(clusterName);
+ } else {
+ requestHeader.setTopic(topic);
+ }
requestHeader.setKey(key);
requestHeader.setMaxNum(maxNum);
requestHeader.setBeginTimestamp(begin);
@@ -436,7 +453,7 @@ public class MQAdminImpl {
String[] keyArray =
keys.split(MessageConst.KEY_SEPARATOR);
for (String k : keyArray) {
// both topic and key must be equal at the
same time
- if (Objects.equals(key, k) &&
Objects.equals(topic, msgTopic)) {
+ if (Objects.equals(key, k) && (isLmq ||
Objects.equals(topic, msgTopic))) {
matched = true;
break;
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java
index 3663df24d6..f52aba2dc0 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java
@@ -165,7 +165,7 @@ public class MQAdminImplTest {
callback.operationSucceed(response);
return null;
}).when(mQClientAPIImpl).queryMessage(anyString(), any(), anyLong(),
any(InvokeCallback.class), any());
- QueryResult actual = mqAdminImpl.queryMessage(defaultTopic, "keys",
100, 1L, 50L, false);
+ QueryResult actual = mqAdminImpl.queryMessage(defaultTopic, "keys",
100, 1L, 50L);
assertNotNull(actual);
assertEquals(1, actual.getMessageList().size());
assertEquals(defaultTopic, actual.getMessageList().get(0).getTopic());
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/LMQProducer.java
b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQProducer.java
similarity index 97%
rename from
example/src/main/java/org/apache/rocketmq/example/simple/LMQProducer.java
rename to example/src/main/java/org/apache/rocketmq/example/lmq/LMQProducer.java
index 81ef2e1385..5fee948028 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/LMQProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQProducer.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.example.simple;
+package org.apache.rocketmq.example.lmq;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -47,6 +47,7 @@ public class LMQProducer {
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message(TOPIC, TAG, ("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET));
+ msg.setKeys("Key" + i);
msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH
/* "INNER_MULTI_DISPATCH" */,
String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER,
LMQ_TOPIC_1, LMQ_TOPIC_2) /* "%LMQ%123,%LMQ%456" */);
SendResult sendResult = producer.send(msg);
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPullConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPullConsumer.java
similarity index 98%
rename from
example/src/main/java/org/apache/rocketmq/example/simple/LMQPullConsumer.java
rename to
example/src/main/java/org/apache/rocketmq/example/lmq/LMQPullConsumer.java
index 7b1bdc3921..931dd96b48 100644
---
a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPullConsumer.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.example.simple;
+package org.apache.rocketmq.example.lmq;
import java.util.Arrays;
import java.util.HashSet;
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPushConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushConsumer.java
similarity index 98%
rename from
example/src/main/java/org/apache/rocketmq/example/simple/LMQPushConsumer.java
rename to
example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushConsumer.java
index efe37d8681..f8926a05df 100644
---
a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushConsumer.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.example.simple;
+package org.apache.rocketmq.example.lmq;
import com.google.common.collect.Lists;
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPushPopConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushPopConsumer.java
similarity index 99%
rename from
example/src/main/java/org/apache/rocketmq/example/simple/LMQPushPopConsumer.java
rename to
example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushPopConsumer.java
index 2044057b2a..517eb12b7d 100644
---
a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPushPopConsumer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushPopConsumer.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.example.simple;
+package org.apache.rocketmq.example.lmq;
import com.google.common.collect.Lists;
import java.util.HashMap;
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 5be6d24ff7..6ebee1d0dd 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -153,6 +153,12 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin,
end);
}
+
+ public QueryResult queryMessage(String clusterName, String topic, String
key, int maxNum, long begin, long end)
+ throws MQClientException, InterruptedException, RemotingException {
+ return defaultMQAdminExtImpl.queryMessage(clusterName, topic, key,
maxNum, begin, end);
+ }
+
@Override
public void start() throws MQClientException {
defaultMQAdminExtImpl.start();
@@ -196,7 +202,8 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
}
@Override
- public void createAndUpdateTopicConfigList(String addr, List<TopicConfig>
topicConfigList) throws InterruptedException, RemotingException,
MQClientException {
+ public void createAndUpdateTopicConfigList(String addr,
+ List<TopicConfig> topicConfigList) throws InterruptedException,
RemotingException, MQClientException {
defaultMQAdminExtImpl.createAndUpdateTopicConfigList(addr,
topicConfigList);
}
@@ -300,6 +307,12 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
return examineConsumeStats(consumerGroup, null);
}
+ @Override
+ public ConsumeStats examineConsumeStats(String clusterName, String
consumerGroup,
+ String topic) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
+ return defaultMQAdminExtImpl.examineConsumeStats(clusterName,
consumerGroup, topic);
+ }
+
@Override
public ConsumeStats examineConsumeStats(String consumerGroup,
String topic) throws RemotingException, MQClientException,
@@ -459,16 +472,35 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
return defaultMQAdminExtImpl.resetOffsetByTimestampOld(consumerGroup,
topic, timestamp, force);
}
+ public List<RollbackStats> resetOffsetByTimestampOld(String clusterName,
String consumerGroup, String topic, long timestamp,
+ boolean force)
+ throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
+ return defaultMQAdminExtImpl.resetOffsetByTimestampOld(clusterName,
consumerGroup, topic, timestamp, force);
+ }
+
+ @Override
+ public Map<MessageQueue, Long> resetOffsetByTimestamp(String clusterName,
String topic, String group,
+ long timestamp, boolean isForce) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
+ return defaultMQAdminExtImpl.resetOffsetByTimestamp(clusterName,
topic, group, timestamp, isForce);
+ }
+
@Override
public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String
group, long timestamp, boolean isForce)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
return resetOffsetByTimestamp(topic, group, timestamp, isForce, false);
}
+ public Map<MessageQueue, Long> resetOffsetByTimestamp(String clusterName,
String topic, String group,
+ long timestamp, boolean isForce, boolean isC)
+ throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
+ return defaultMQAdminExtImpl.resetOffsetByTimestamp(clusterName,
topic, group, timestamp, isForce, isC);
+ }
+
+
public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String
group, long timestamp, boolean isForce,
boolean isC)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
- return defaultMQAdminExtImpl.resetOffsetByTimestamp(topic, group,
timestamp, isForce, isC);
+ return defaultMQAdminExtImpl.resetOffsetByTimestamp(null, topic,
group, timestamp, isForce, isC);
}
@Override
@@ -589,10 +621,19 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
@Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(final String
consumerGroup, final String clientId,
- final String topic, final String msgId) throws RemotingException,
MQClientException, InterruptedException, MQBrokerException {
+ final String topic,
+ final String msgId) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup,
clientId, topic, msgId);
}
+ @Override
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(final String
clusterName, final String consumerGroup,
+ final String clientId,
+ final String topic,
+ final String msgId) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
+ return defaultMQAdminExtImpl.consumeMessageDirectly(clusterName,
consumerGroup, clientId, topic, msgId);
+ }
+
@Override
public List<MessageTrack> messageTrackDetail(
MessageExt msg) throws RemotingException, MQClientException,
InterruptedException,
@@ -796,10 +837,10 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
this.defaultMQAdminExtImpl.resetMasterFlushOffset(brokerAddr,
masterFlushOffset);
}
- public QueryResult queryMessageByUniqKey(String topic, String key, int
maxNum, long begin, long end)
+ public QueryResult queryMessageByUniqKey(String clusterName, String topic,
String key, int maxNum, long begin,
+ long end)
throws MQClientException, InterruptedException {
-
- return defaultMQAdminExtImpl.queryMessageByUniqKey(topic, key, maxNum,
begin, end);
+ return defaultMQAdminExtImpl.queryMessageByUniqKey(clusterName, topic,
key, maxNum, begin, end);
}
public DefaultMQAdminExtImpl getDefaultMQAdminExtImpl() {
@@ -831,13 +872,14 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
@Override
public Pair<ElectMasterResponseHeader, BrokerMemberGroup>
electMaster(String controllerAddr, String clusterName,
-
String brokerName, Long brokerId) throws RemotingException,
InterruptedException, MQBrokerException {
+ String brokerName, Long brokerId) throws RemotingException,
InterruptedException, MQBrokerException {
return this.defaultMQAdminExtImpl.electMaster(controllerAddr,
clusterName, brokerName, brokerId);
}
@Override
public void cleanControllerBrokerData(String controllerAddr, String
clusterName, String brokerName,
- String brokerControllerIdsToClean, boolean isCleanLivingBroker) throws
RemotingException, InterruptedException, MQBrokerException {
+ String brokerControllerIdsToClean,
+ boolean isCleanLivingBroker) throws RemotingException,
InterruptedException, MQBrokerException {
this.defaultMQAdminExtImpl.cleanControllerBrokerData(controllerAddr,
clusterName, brokerName, brokerControllerIdsToClean, isCleanLivingBroker);
}
@@ -876,13 +918,15 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
}
@Override
- public void createUser(String brokerAddr, String username, String
password, String userType) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
+ public void createUser(String brokerAddr, String username, String password,
+ String userType) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
defaultMQAdminExtImpl.createUser(brokerAddr, username, password,
userType);
}
@Override
public void updateUser(String brokerAddr, String username,
- String password, String userType, String userStatus) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ String password, String userType,
+ String userStatus) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
defaultMQAdminExtImpl.updateUser(brokerAddr, username, password,
userType, userStatus);
}
@@ -912,38 +956,45 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
@Override
public void createAcl(String brokerAddr, String subject, List<String>
resources, List<String> actions,
- List<String> sourceIps, String decision) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ List<String> sourceIps,
+ String decision) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
defaultMQAdminExtImpl.createAcl(brokerAddr, subject, resources,
actions, sourceIps, decision);
}
@Override
- public void createAcl(String brokerAddr, AclInfo aclInfo) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ public void createAcl(String brokerAddr,
+ AclInfo aclInfo) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
defaultMQAdminExtImpl.createAcl(brokerAddr, aclInfo);
}
@Override
public void updateAcl(String brokerAddr, String subject, List<String>
resources, List<String> actions,
- List<String> sourceIps, String decision) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ List<String> sourceIps,
+ String decision) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
defaultMQAdminExtImpl.updateAcl(brokerAddr, subject, resources,
actions, sourceIps, decision);
}
@Override
- public void updateAcl(String brokerAddr, AclInfo aclInfo) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ public void updateAcl(String brokerAddr,
+ AclInfo aclInfo) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
defaultMQAdminExtImpl.updateAcl(brokerAddr, aclInfo);
}
@Override
- public void deleteAcl(String brokerAddr, String subject, String resource)
throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ public void deleteAcl(String brokerAddr, String subject,
+ String resource) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
defaultMQAdminExtImpl.deleteAcl(brokerAddr, subject, resource);
}
@Override
- public AclInfo getAcl(String brokerAddr, String subject) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ public AclInfo getAcl(String brokerAddr,
+ String subject) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
return defaultMQAdminExtImpl.getAcl(brokerAddr, subject);
}
@Override
- public List<AclInfo> listAcl(String brokerAddr, String subjectFilter,
String resourceFilter) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
+ public List<AclInfo> listAcl(String brokerAddr, String subjectFilter,
+ String resourceFilter) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
return defaultMQAdminExtImpl.listAcl(brokerAddr, subjectFilter,
resourceFilter);
}
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 9546235d3e..dc4d35e704 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -422,14 +422,21 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
return
this.mqClientInstance.getMQClientAPIImpl().getBrokerRuntimeInfo(brokerAddr,
timeoutMillis);
}
+ @Override
+ public ConsumeStats examineConsumeStats(
+ String consumerGroup,
+ String topic) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
+ return examineConsumeStats(null, consumerGroup, topic);
+ }
+
@Override
public ConsumeStats examineConsumeStats(
String consumerGroup) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
- return examineConsumeStats(consumerGroup, null);
+ return examineConsumeStats(null, consumerGroup, null);
}
@Override
- public ConsumeStats examineConsumeStats(String consumerGroup,
+ public ConsumeStats examineConsumeStats(String clusterName, String
consumerGroup,
String topic) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
TopicRouteData topicRouteData = null;
List<String> routeTopics = new ArrayList<>();
@@ -438,6 +445,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
routeTopics.add(topic);
routeTopics.add(KeyBuilder.buildPopRetryTopic(topic,
consumerGroup));
}
+
+ // Use clusterName topic to get topic route for lmq or
rmq_sys_wheel_timer
+ if (!StringUtils.isEmpty(topic) && (MixAll.isLmq(topic) ||
topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer")) &&
!StringUtils.isEmpty(clusterName)) {
+ routeTopics.add(clusterName);
+ }
+
for (int i = 0; i < routeTopics.size(); i++) {
try {
topicRouteData =
this.examineTopicRouteInfo(routeTopics.get(i));
@@ -467,25 +480,33 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
topics.add(messageQueue.getTopic());
}
- ConsumeStats staticResult = new ConsumeStats();
- staticResult.setConsumeTps(result.getConsumeTps());
- // for topic, we put the physical stats, how about group?
- // staticResult.getOffsetTable().putAll(result.getOffsetTable());
-
- for (String currentTopic : topics) {
- TopicRouteData currentRoute =
this.examineTopicRouteInfo(currentTopic);
- if (currentRoute.getTopicQueueMappingByBroker() == null
- || currentRoute.getTopicQueueMappingByBroker().isEmpty()) {
- //normal topic
- for (Map.Entry<MessageQueue, OffsetWrapper> entry :
result.getOffsetTable().entrySet()) {
- if (entry.getKey().getTopic().equals(currentTopic)) {
- staticResult.getOffsetTable().put(entry.getKey(),
entry.getValue());
+ ConsumeStats staticResult = null;
+
+ if (StringUtils.isEmpty(clusterName)) {
+
+ staticResult = new ConsumeStats();
+ staticResult.setConsumeTps(result.getConsumeTps());
+ // for topic, we put the physical stats, how about group?
+ // staticResult.getOffsetTable().putAll(result.getOffsetTable());
+
+ for (String currentTopic : topics) {
+ TopicRouteData currentRoute =
this.examineTopicRouteInfo(currentTopic);
+ if (currentRoute.getTopicQueueMappingByBroker() == null
+ || currentRoute.getTopicQueueMappingByBroker().isEmpty()) {
+ //normal topic
+ for (Map.Entry<MessageQueue, OffsetWrapper> entry :
result.getOffsetTable().entrySet()) {
+ if (entry.getKey().getTopic().equals(currentTopic)) {
+ staticResult.getOffsetTable().put(entry.getKey(),
entry.getValue());
+ }
}
}
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap =
MQAdminUtils.examineTopicConfigFromRoute(currentTopic, currentRoute,
defaultMQAdminExt);
+ ConsumeStats consumeStats =
MQAdminUtils.convertPhysicalConsumeStats(brokerConfigMap, result);
+
staticResult.getOffsetTable().putAll(consumeStats.getOffsetTable());
}
- Map<String, TopicConfigAndQueueMapping> brokerConfigMap =
MQAdminUtils.examineTopicConfigFromRoute(currentTopic, currentRoute,
defaultMQAdminExt);
- ConsumeStats consumeStats =
MQAdminUtils.convertPhysicalConsumeStats(brokerConfigMap, result);
-
staticResult.getOffsetTable().putAll(consumeStats.getOffsetTable());
+
+ } else {
+ staticResult = result;
}
if (staticResult.getOffsetTable().isEmpty()) {
@@ -811,10 +832,16 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
this.mqClientInstance.getMQClientAPIImpl().deleteKVConfigValue(namespace, key,
timeoutMillis);
}
- @Override
- public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup,
String topic, long timestamp,
+ public List<RollbackStats> resetOffsetByTimestampOld(String clusterName,
String consumerGroup, String topic,
+ long timestamp,
boolean force) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
- TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+ String routeTopic = topic;
+ // Use clusterName topic to get topic route for lmq or
rmq_sys_wheel_timer
+ if (!StringUtils.isEmpty(topic) && (MixAll.isLmq(topic) ||
topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer"))
+ && !StringUtils.isEmpty(clusterName)) {
+ routeTopic = clusterName;
+ }
+ TopicRouteData topicRouteData = this.examineTopicRouteInfo(routeTopic);
List<RollbackStats> rollbackStatsList = new ArrayList<>();
Map<String, QueueData> topicRouteMap = new HashMap<>();
for (QueueData queueData : topicRouteData.getQueueDatas()) {
@@ -829,6 +856,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
return rollbackStatsList;
}
+ @Override
+ public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup,
String topic, long timestamp,
+ boolean force) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
+ return resetOffsetByTimestampOld(null, consumerGroup, topic,
timestamp, force);
+ }
+
private List<RollbackStats> resetOffsetByTimestampOld(String brokerAddr,
QueueData queueData, String consumerGroup,
String topic, long timestamp,
boolean force) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
@@ -864,7 +897,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
@Override
public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String
group, long timestamp,
boolean isForce) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
- return resetOffsetByTimestamp(topic, group, timestamp, isForce, false);
+ return resetOffsetByTimestamp(null, topic, group, timestamp, isForce,
false);
}
@Override
@@ -951,9 +984,16 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
});
}
- public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String
group, long timestamp, boolean isForce,
+ public Map<MessageQueue, Long> resetOffsetByTimestamp(String clusterName,
String topic, String group,
+ long timestamp, boolean isForce,
boolean isC) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
- TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+ String routeTopic = topic;
+ // Use clusterName topic to get topic route for lmq or
rmq_sys_wheel_timer
+ if (!StringUtils.isEmpty(topic) && (MixAll.isLmq(topic) ||
topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer"))
+ && !StringUtils.isEmpty(clusterName)) {
+ routeTopic = clusterName;
+ }
+ TopicRouteData topicRouteData = this.examineTopicRouteInfo(routeTopic);
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
Map<MessageQueue, Long> allOffsetTable = new HashMap<>();
if (brokerDatas != null) {
@@ -1325,7 +1365,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
@Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(final String
consumerGroup, final String clientId,
- final String topic, final String msgId) throws RemotingException,
MQClientException, InterruptedException, MQBrokerException {
+ final String topic,
+ final String msgId) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
MessageExt msg = this.viewMessage(topic, msgId);
if
(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
return
this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(NetworkUtil.socketAddress2String(msg.getStoreHost()),
consumerGroup, clientId, topic, msgId, timeoutMillis);
@@ -1335,6 +1376,20 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
}
}
+ @Override
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(final String
clusterName, final String consumerGroup,
+ final String clientId,
+ final String topic,
+ final String msgId) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
+ MessageExt msg = this.queryMessage(clusterName, topic, msgId);
+ if
(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
+ return
this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(NetworkUtil.socketAddress2String(msg.getStoreHost()),
consumerGroup, clientId, topic, msgId, timeoutMillis);
+ } else {
+ MessageClientExt msgClient = (MessageClientExt) msg;
+ return
this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(NetworkUtil.socketAddress2String(msg.getStoreHost()),
consumerGroup, clientId, topic, msgClient.getOffsetMsgId(), timeoutMillis);
+ }
+ }
+
@Override
public List<MessageTrack> messageTrackDetail(
MessageExt msg) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
@@ -1664,10 +1719,10 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
while (iterator.hasNext()) {
TopicConfig topicConfig = iterator.next().getValue();
if (topicList.getTopicList().contains(topicConfig.getTopicName())
- ||
TopicValidator.isSystemTopic(topicConfig.getTopicName())) {
+ || TopicValidator.isSystemTopic(topicConfig.getTopicName())) {
iterator.remove();
} else if (!specialTopic &&
StringUtils.startsWithAny(topicConfig.getTopicName(),
- MixAll.RETRY_GROUP_TOPIC_PREFIX,
MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+ MixAll.RETRY_GROUP_TOPIC_PREFIX,
MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
iterator.remove();
} else if (!PermName.isValid(topicConfig.getPerm())) {
iterator.remove();
@@ -1726,6 +1781,11 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key,
maxNum, begin, end);
}
+ public QueryResult queryMessage(String clusterName, String topic, String
key, int maxNum, long begin,
+ long end) throws MQClientException, InterruptedException,
RemotingException {
+ return
this.mqClientInstance.getMQAdminImpl().queryMessage(clusterName, topic, key,
maxNum, begin, end, false);
+ }
+
@Override
public void updateConsumeOffset(String brokerAddr, String consumeGroup,
MessageQueue mq,
long offset) throws RemotingException, InterruptedException,
MQBrokerException {
@@ -1783,10 +1843,9 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
return
this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, topicName,
queueId, timestamp, timeoutMillis);
}
- public QueryResult queryMessageByUniqKey(String topic, String key, int
maxNum, long begin,
+ public QueryResult queryMessageByUniqKey(String clusterName, String topic,
String key, int maxNum, long begin,
long end) throws MQClientException, InterruptedException {
-
- return
this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, key,
maxNum, begin, end);
+ return
this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(clusterName,
topic, key, maxNum, begin, end);
}
@Override
@@ -1812,6 +1871,12 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
}
}
+ @Override
+ public Map<MessageQueue, Long> resetOffsetByTimestamp(String clusterName,
String topic, String group,
+ long timestamp, boolean isForce) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
+ return resetOffsetByTimestamp(clusterName, topic, group, timestamp,
isForce, false);
+ }
+
@Override
public HARuntimeInfo getBrokerHAStatus(
String brokerAddr) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
MQBrokerException {
@@ -1844,7 +1909,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
@Override
public Pair<ElectMasterResponseHeader, BrokerMemberGroup>
electMaster(String controllerAddr, String clusterName,
-
String brokerName, Long brokerId) throws RemotingException,
InterruptedException, MQBrokerException {
+ String brokerName, Long brokerId) throws RemotingException,
InterruptedException, MQBrokerException {
return
this.mqClientInstance.getMQClientAPIImpl().electMaster(controllerAddr,
clusterName, brokerName, brokerId);
}
@@ -1930,20 +1995,23 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
}
@Override
- public void createUser(String brokerAddr, String username, String
password, String userType) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
+ public void createUser(String brokerAddr, String username, String password,
+ String userType) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
UserInfo userInfo = UserInfo.of(username, password, userType);
this.createUser(brokerAddr, userInfo);
}
@Override
public void updateUser(String brokerAddr, String username,
- String password, String userType, String userStatus) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ String password, String userType,
+ String userStatus) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
UserInfo userInfo = UserInfo.of(username, password, userType,
userStatus);
this.mqClientInstance.getMQClientAPIImpl().updateUser(brokerAddr,
userInfo, timeoutMillis);
}
@Override
- public void updateUser(String brokerAddr, UserInfo userInfo) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ public void updateUser(String brokerAddr,
+ UserInfo userInfo) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
this.mqClientInstance.getMQClientAPIImpl().updateUser(brokerAddr,
userInfo, timeoutMillis);
}
@@ -1967,40 +2035,47 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
@Override
public void createAcl(String brokerAddr, String subject, List<String>
resources, List<String> actions,
- List<String> sourceIps, String decision) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ List<String> sourceIps,
+ String decision) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
AclInfo aclInfo = AclInfo.of(subject, resources, actions, sourceIps,
decision);
this.createAcl(brokerAddr, aclInfo);
}
@Override
- public void createAcl(String brokerAddr, AclInfo aclInfo) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ public void createAcl(String brokerAddr,
+ AclInfo aclInfo) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
this.mqClientInstance.getMQClientAPIImpl().createAcl(brokerAddr,
aclInfo, timeoutMillis);
}
@Override
public void updateAcl(String brokerAddr, String subject, List<String>
resources, List<String> actions,
- List<String> sourceIps, String decision) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ List<String> sourceIps,
+ String decision) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
AclInfo aclInfo = AclInfo.of(subject, resources, actions, sourceIps,
decision);
this.updateAcl(brokerAddr, aclInfo);
}
@Override
- public void updateAcl(String brokerAddr, AclInfo aclInfo) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ public void updateAcl(String brokerAddr,
+ AclInfo aclInfo) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
this.mqClientInstance.getMQClientAPIImpl().updateAcl(brokerAddr,
aclInfo, timeoutMillis);
}
@Override
- public void deleteAcl(String brokerAddr, String subject, String resource)
throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ public void deleteAcl(String brokerAddr, String subject,
+ String resource) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
this.mqClientInstance.getMQClientAPIImpl().deleteAcl(brokerAddr,
subject, resource, timeoutMillis);
}
@Override
- public AclInfo getAcl(String brokerAddr, String subject) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
+ public AclInfo getAcl(String brokerAddr,
+ String subject) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().getAcl(brokerAddr,
subject, timeoutMillis);
}
@Override
- public List<AclInfo> listAcl(String brokerAddr, String subjectFilter,
String resourceFilter) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
+ public List<AclInfo> listAcl(String brokerAddr, String subjectFilter,
+ String resourceFilter) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().listAcl(brokerAddr,
subjectFilter, resourceFilter, timeoutMillis);
}
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 9dff3cbab9..ff78f22c70 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -152,6 +152,10 @@ public interface MQAdminExt extends MQAdmin {
final String topic) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException;
+ ConsumeStats examineConsumeStats(final String clusterName, final String
consumerGroup,
+ final String topic) throws RemotingException, MQClientException,
+ InterruptedException, MQBrokerException;
+
ConsumeStats examineConsumeStats(final String brokerAddr, final String
consumerGroup, final String topicName,
final long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;
@@ -232,6 +236,9 @@ public interface MQAdminExt extends MQAdmin {
Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group,
long timestamp, boolean isForce)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException;
+ Map<MessageQueue, Long> resetOffsetByTimestamp(String clusterName, String
topic, String group, long timestamp, boolean isForce)
+ throws RemotingException, MQBrokerException, InterruptedException,
MQClientException;
+
void resetOffsetNew(String consumerGroup, String topic, long timestamp)
throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
@@ -293,6 +300,11 @@ public interface MQAdminExt extends MQAdmin {
String topic,
String msgId) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException;
+ ConsumeMessageDirectlyResult consumeMessageDirectly(String clusterName,
String consumerGroup,
+ String clientId,
+ String topic,
+ String msgId) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException;
+
List<MessageTrack> messageTrackDetail(
MessageExt msg) throws RemotingException, MQClientException,
InterruptedException,
MQBrokerException;
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
index c489cad684..b638dcf61f 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -72,6 +72,10 @@ public class ConsumerProgressSubCommand implements
SubCommand {
optionShowClientIP.setRequired(false);
options.addOption(optionShowClientIP);
+ opt = new Option("c", "cluster", true, "Cluster name or lmq parent
topic, lmq is used to find the route.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -109,6 +113,8 @@ public class ConsumerProgressSubCommand implements
SubCommand {
boolean showClientIP = commandLine.hasOption('s')
&& "true".equalsIgnoreCase(commandLine.getOptionValue('s'));
+ String clusterName = commandLine.hasOption('c') ?
commandLine.getOptionValue('c').trim() : null;
+
if (commandLine.hasOption('g')) {
String consumerGroup = commandLine.getOptionValue('g').trim();
String topicName = commandLine.hasOption('t') ?
commandLine.getOptionValue('t').trim() : null;
@@ -116,7 +122,7 @@ public class ConsumerProgressSubCommand implements
SubCommand {
if (topicName == null) {
consumeStats =
defaultMQAdminExt.examineConsumeStats(consumerGroup);
} else {
- consumeStats =
defaultMQAdminExt.examineConsumeStats(consumerGroup, topicName);
+ consumeStats =
defaultMQAdminExt.examineConsumeStats(clusterName, consumerGroup, topicName);
}
List<MessageQueue> mqList = new
LinkedList<>(consumeStats.getOffsetTable().keySet());
Collections.sort(mqList);
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
index 5245ca089f..e83029eed3 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
@@ -44,9 +44,10 @@ import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class QueryMsgByIdSubCommand implements SubCommand {
- public static void queryById(final DefaultMQAdminExt admin, final String
topic, final String msgId, final Charset msgBodyCharset) throws
MQClientException,
+ public static void queryById(final DefaultMQAdminExt admin, final String
clusterName, final String topic,
+ final String msgId, final Charset msgBodyCharset) throws
MQClientException,
RemotingException, MQBrokerException, InterruptedException,
IOException {
- MessageExt msg = admin.viewMessage(topic, msgId);
+ MessageExt msg = admin.queryMessage(clusterName, topic, msgId);
printMsg(admin, msg, msgBodyCharset);
}
@@ -55,7 +56,8 @@ public class QueryMsgByIdSubCommand implements SubCommand {
printMsg(admin, msg, null);
}
- public static void printMsg(final DefaultMQAdminExt admin, final
MessageExt msg, final Charset msgBodyCharset) throws IOException {
+ public static void printMsg(final DefaultMQAdminExt admin, final
MessageExt msg,
+ final Charset msgBodyCharset) throws IOException {
if (msg == null) {
System.out.printf("%nMessage not found!");
return;
@@ -219,6 +221,10 @@ public class QueryMsgByIdSubCommand implements SubCommand {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("c", "cluster", true, "Cluster name or lmq parent
topic, lmq is used to find the route.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -244,13 +250,14 @@ public class QueryMsgByIdSubCommand implements SubCommand
{
final String msgIds = commandLine.getOptionValue('i').trim();
final String[] msgIdArr = StringUtils.split(msgIds, ",");
+ String clusterName = commandLine.hasOption('c') ?
commandLine.getOptionValue('c').trim() : null;
if (commandLine.hasOption('g') && commandLine.hasOption('d')) {
final String consumerGroup =
commandLine.getOptionValue('g').trim();
final String clientId = commandLine.getOptionValue('d').trim();
for (String msgId : msgIdArr) {
if (StringUtils.isNotBlank(msgId)) {
- pushMsg(defaultMQAdminExt, consumerGroup, clientId,
topic, msgId.trim());
+ pushMsg(defaultMQAdminExt, clusterName, consumerGroup,
clientId, topic, msgId.trim());
}
}
} else if (commandLine.hasOption('s')) {
@@ -258,7 +265,7 @@ public class QueryMsgByIdSubCommand implements SubCommand {
if (resend) {
for (String msgId : msgIdArr) {
if (StringUtils.isNotBlank(msgId)) {
- sendMsg(defaultMQAdminExt, defaultMQProducer,
topic, msgId.trim());
+ sendMsg(defaultMQAdminExt, clusterName,
defaultMQProducer, topic, msgId.trim());
}
}
}
@@ -269,7 +276,7 @@ public class QueryMsgByIdSubCommand implements SubCommand {
}
for (String msgId : msgIdArr) {
if (StringUtils.isNotBlank(msgId)) {
- queryById(defaultMQAdminExt, topic, msgId.trim(),
msgBodyCharset);
+ queryById(defaultMQAdminExt, clusterName, topic,
msgId.trim(), msgBodyCharset);
}
}
@@ -282,13 +289,14 @@ public class QueryMsgByIdSubCommand implements SubCommand
{
}
}
- private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final
String consumerGroup, final String clientId,
+ private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final
String clusterName,
+ final String consumerGroup, final String clientId,
final String topic, final String msgId) {
try {
ConsumerRunningInfo consumerRunningInfo =
defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false, false);
if (consumerRunningInfo != null &&
ConsumerRunningInfo.isPushType(consumerRunningInfo)) {
ConsumeMessageDirectlyResult result =
-
defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
+ defaultMQAdminExt.consumeMessageDirectly(clusterName,
consumerGroup, clientId, topic, msgId);
System.out.printf("%s", result);
} else {
System.out.printf("this %s client is not push consumer ,not
support direct push \n", clientId);
@@ -298,10 +306,11 @@ public class QueryMsgByIdSubCommand implements SubCommand
{
}
}
- private void sendMsg(final DefaultMQAdminExt defaultMQAdminExt, final
DefaultMQProducer defaultMQProducer,
+ private void sendMsg(final DefaultMQAdminExt defaultMQAdminExt, final
String clusterName,
+ final DefaultMQProducer defaultMQProducer,
final String topic, final String msgId) {
try {
- MessageExt msg = defaultMQAdminExt.viewMessage(topic, msgId);
+ MessageExt msg = defaultMQAdminExt.queryMessage(clusterName,
topic, msgId);
if (msg != null) {
// resend msg by id
System.out.printf("prepare resend msg. originalMsgId=%s",
msgId);
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
index 64627fd19f..02961c3bb5 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
@@ -23,6 +23,8 @@ import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
@@ -41,7 +43,7 @@ public class QueryMsgByKeySubCommand implements SubCommand {
@Override
public Options buildCommandlineOptions(Options options) {
- Option opt = new Option("t", "topic", true, "topic name");
+ Option opt = new Option("t", "topic", true, "Topic name");
opt.setRequired(true);
options.addOption(opt);
@@ -57,7 +59,11 @@ public class QueryMsgByKeySubCommand implements SubCommand {
opt.setRequired(false);
options.addOption(opt);
- opt = new Option("c", "maxNum", true, "The maximum number of messages
returned by the query, default:64");
+ opt = new Option("m", "maxNum", true, "The maximum number of messages
returned by the query, default:64");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "cluster", true, "Cluster name or lmq parent
topic, lmq is used to find the route.");
opt.setRequired(false);
options.addOption(opt);
@@ -77,16 +83,20 @@ public class QueryMsgByKeySubCommand implements SubCommand {
long beginTimestamp = 0;
long endTimestamp = Long.MAX_VALUE;
int maxNum = 64;
+ String clusterName = null;
if (commandLine.hasOption("b")) {
beginTimestamp =
Long.parseLong(commandLine.getOptionValue("b").trim());
}
if (commandLine.hasOption("e")) {
endTimestamp =
Long.parseLong(commandLine.getOptionValue("e").trim());
}
+ if (commandLine.hasOption("m")) {
+ maxNum =
Integer.parseInt(commandLine.getOptionValue("m").trim());
+ }
if (commandLine.hasOption("c")) {
- maxNum =
Integer.parseInt(commandLine.getOptionValue("c").trim());
+ clusterName = commandLine.getOptionValue("c").trim();
}
- this.queryByKey(defaultMQAdminExt, topic, key, maxNum,
beginTimestamp, endTimestamp);
+ this.queryByKey(defaultMQAdminExt, clusterName, topic, key,
maxNum, beginTimestamp, endTimestamp);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
} finally {
@@ -94,12 +104,13 @@ public class QueryMsgByKeySubCommand implements SubCommand
{
}
}
- private void queryByKey(final DefaultMQAdminExt admin, final String topic,
final String key, int maxNum, long begin,
+ private void queryByKey(final DefaultMQAdminExt admin, final String
cluster, final String topic, final String key, int maxNum, long begin,
long end)
- throws MQClientException, InterruptedException {
+ throws MQClientException, InterruptedException, RemotingException {
admin.start();
- QueryResult queryResult = admin.queryMessage(topic, key, maxNum,
begin, end);
+ QueryResult queryResult = admin.queryMessage(cluster, topic, key,
maxNum, begin, end);
+
System.out.printf("%-50s %4s %40s%n",
"#Message ID",
"#QID",
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
index b71cee9016..5295d91cc3 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
@@ -25,13 +25,11 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.QueryResult;
-import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
@@ -51,19 +49,18 @@ public class QueryMsgByUniqueKeySubCommand implements
SubCommand {
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
defaultMQAdminExt.start();
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName()
+ " command failed", e);
}
return defaultMQAdminExt;
}
}
- public static void queryById(final DefaultMQAdminExt admin, final String
topic, final String msgId,
- final boolean showAll) throws
MQClientException,
- RemotingException, MQBrokerException, InterruptedException,
IOException {
+ public static void queryById(final DefaultMQAdminExt admin, final String
clusterName, final String topic,
+ final String msgId,
+ final boolean showAll) throws MQClientException, InterruptedException,
IOException {
- QueryResult queryResult = admin.queryMessageByUniqKey(topic, msgId,
32, 0, Long.MAX_VALUE);
+ QueryResult queryResult = admin.queryMessageByUniqKey(clusterName,
topic, msgId, 32, 0, Long.MAX_VALUE);
assert queryResult != null;
List<MessageExt> list = queryResult.getMessageList();
if (list == null || list.size() == 0) {
@@ -94,7 +91,7 @@ public class QueryMsgByUniqueKeySubCommand implements
SubCommand {
System.out.printf(strFormat, "Store Host:",
RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()));
System.out.printf(intFormat, "System Flag:", msg.getSysFlag());
System.out.printf(strFormat, "Properties:",
- msg.getProperties() != null ? msg.getProperties().toString() :
"");
+ msg.getProperties() != null ? msg.getProperties().toString() : "");
System.out.printf(strFormat, "Message Body Path:", bodyTmpFilePath);
try {
@@ -166,6 +163,10 @@ public class QueryMsgByUniqueKeySubCommand implements
SubCommand {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("c", "cluster", true, "Cluster name or lmq parent
topic, lmq is used to find the route.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -173,10 +174,11 @@ public class QueryMsgByUniqueKeySubCommand implements
SubCommand {
public void execute(CommandLine commandLine, Options options, RPCHook
rpcHook) throws SubCommandException {
try {
- defaultMQAdminExt = createMQAdminExt(rpcHook);
+ defaultMQAdminExt = createMQAdminExt(rpcHook);
final String msgId = commandLine.getOptionValue('i').trim();
final String topic = commandLine.getOptionValue('t').trim();
+ String clusterName = commandLine.hasOption('c') ?
commandLine.getOptionValue('c').trim() : null;
final boolean showAll = commandLine.hasOption('a');
if (commandLine.hasOption('g') && commandLine.hasOption('d')) {
final String consumerGroup =
commandLine.getOptionValue('g').trim();
@@ -189,14 +191,14 @@ public class QueryMsgByUniqueKeySubCommand implements
SubCommand {
}
if (consumerRunningInfo != null &&
ConsumerRunningInfo.isPushType(consumerRunningInfo)) {
ConsumeMessageDirectlyResult result =
-
defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
+
defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
System.out.printf("%s", result);
} else {
- System.out.printf("get consumer info failed or this %s
client is not push consumer ,not support direct push \n", clientId);
+ System.out.printf("get consumer info failed or this %s
client is not push consumer, not support direct push \n", clientId);
}
} else {
- queryById(defaultMQAdminExt, topic, msgId, showAll);
+ queryById(defaultMQAdminExt, clusterName, topic, msgId,
showAll);
}
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
index 993fa50187..84a301bd60 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
@@ -77,6 +77,10 @@ public class ResetOffsetByTimeCommand implements SubCommand {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("c", "cluster", true, "Cluster name or lmq parent
topic, lmq is used to find the route.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -88,6 +92,7 @@ public class ResetOffsetByTimeCommand implements SubCommand {
String group = commandLine.getOptionValue("g").trim();
String topic = commandLine.getOptionValue("t").trim();
String timeStampStr = commandLine.getOptionValue("s").trim();
+ String clusterName = commandLine.hasOption('c') ?
commandLine.getOptionValue('c').trim() : null;
long timestamp = "now".equals(timeStampStr) ?
System.currentTimeMillis() : 0;
try {
@@ -129,7 +134,7 @@ public class ResetOffsetByTimeCommand implements SubCommand
{
if (brokerAddr != null && queueId >= 0) {
System.out.printf("start reset consumer offset by specified, "
+
"group[%s], topic[%s], queueId[%s], broker[%s],
timestamp(string)[%s], timestamp(long)[%s]%n",
- group, topic, queueId, brokerAddr, timeStampStr,
timestamp);
+ group, topic, queueId, brokerAddr, timeStampStr,
timestamp);
long resetOffset = null != offset ? offset :
defaultMQAdminExt.searchOffset(brokerAddr, topic, queueId,
timestamp, 3000);
@@ -143,11 +148,11 @@ public class ResetOffsetByTimeCommand implements
SubCommand {
Map<MessageQueue, Long> offsetTable;
try {
- offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic,
group, timestamp, force, isC);
+ offsetTable =
defaultMQAdminExt.resetOffsetByTimestamp(clusterName, topic, group, timestamp,
force, isC);
} catch (MQClientException e) {
- // if consumer not online, use old command to reset reset
+ // if consumer not online, use old command to reset
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
- ResetOffsetByTimeOldCommand.resetOffset(defaultMQAdminExt,
group, topic, timestamp, force, timeStampStr);
+ ResetOffsetByTimeOldCommand.resetOffset(defaultMQAdminExt,
clusterName, group, topic, timestamp, force, timeStampStr);
return;
}
throw e;
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
index 7984bb8c39..c179c5c805 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
@@ -34,12 +34,13 @@ import
org.apache.rocketmq.tools.command.SubCommandException;
public class ResetOffsetByTimeOldCommand implements SubCommand {
- public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String
consumerGroup, String topic,
+ public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String
clusterName, String consumerGroup,
+ String topic,
long timestamp, boolean force, String timeStampStr)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
List<RollbackStats> rollbackStatsList =
- defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic,
timestamp, force);
+ defaultMQAdminExt.resetOffsetByTimestampOld(clusterName,
consumerGroup, topic, timestamp, force);
System.out.printf("reset consumer offset by specified " +
"consumerGroup[%s], topic[%s], force[%s],
timestamp(string)[%s], timestamp(long)[%s]%n",
@@ -93,6 +94,11 @@ public class ResetOffsetByTimeOldCommand implements
SubCommand {
opt = new Option("f", "force", true, "set the force rollback by
timestamp switch[true|false]");
opt.setRequired(false);
options.addOption(opt);
+
+ opt = new Option("c", "cluster", true, "Cluster name or lmq parent
topic, lmq is used to find the route.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -104,6 +110,7 @@ public class ResetOffsetByTimeOldCommand implements
SubCommand {
String consumerGroup = commandLine.getOptionValue("g").trim();
String topic = commandLine.getOptionValue("t").trim();
String timeStampStr = commandLine.getOptionValue("s").trim();
+ String clusterName = commandLine.hasOption('c') ?
commandLine.getOptionValue('c').trim() : null;
long timestamp = 0;
try {
timestamp = Long.parseLong(timeStampStr);
@@ -123,7 +130,7 @@ public class ResetOffsetByTimeOldCommand implements
SubCommand {
force =
Boolean.parseBoolean(commandLine.getOptionValue("f").trim());
}
defaultMQAdminExt.start();
- resetOffset(defaultMQAdminExt, consumerGroup, topic, timestamp,
force, timeStampStr);
+ resetOffset(defaultMQAdminExt, clusterName, consumerGroup, topic,
timestamp, force, timeStampStr);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java
index b22491a591..8f2ac2e1e1 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java
@@ -57,6 +57,10 @@ public class SkipAccumulationSubCommand implements
SubCommand {
opt = new Option("f", "force", true, "set the force rollback by
timestamp switch[true|false]");
opt.setRequired(false);
options.addOption(opt);
+
+ opt = new Option("c", "cluster", true, "Cluster name or lmq parent
topic, lmq is used to find the route.");
+ opt.setRequired(false);
+ options.addOption(opt);
return options;
}
@@ -68,6 +72,7 @@ public class SkipAccumulationSubCommand implements SubCommand
{
try {
String group = commandLine.getOptionValue("g").trim();
String topic = commandLine.getOptionValue("t").trim();
+ String clusterName = commandLine.hasOption('c') ?
commandLine.getOptionValue('c').trim() : null;
boolean force = true;
if (commandLine.hasOption('f')) {
force =
Boolean.valueOf(commandLine.getOptionValue("f").trim());
@@ -76,7 +81,7 @@ public class SkipAccumulationSubCommand implements SubCommand
{
defaultMQAdminExt.start();
Map<MessageQueue, Long> offsetTable;
try {
- offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic,
group, timestamp, force);
+ offsetTable =
defaultMQAdminExt.resetOffsetByTimestamp(clusterName, topic, group, timestamp,
force);
} catch (MQClientException e) {
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
List<RollbackStats> rollbackStatsList =
defaultMQAdminExt.resetOffsetByTimestampOld(group, topic, timestamp, force);
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
index a1619ecedf..47ca761d1f 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
@@ -27,6 +27,8 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
@@ -48,6 +50,10 @@ public class TopicStatusSubCommand implements SubCommand {
Option opt = new Option("t", "topic", true, "topic name");
opt.setRequired(true);
options.addOption(opt);
+
+ opt = new Option("c", "cluster", true, "cluster name or lmq parent
topic, lmq is used to find the route.");
+ opt.setRequired(false);
+ options.addOption(opt);
return options;
}
@@ -58,10 +64,26 @@ public class TopicStatusSubCommand implements SubCommand {
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
try {
+ TopicStatsTable topicStatsTable = new TopicStatsTable();
defaultMQAdminExt.start();
String topic = commandLine.getOptionValue('t').trim();
- TopicStatsTable topicStatsTable =
defaultMQAdminExt.examineTopicStats(topic);
+
+ if (commandLine.hasOption('c')) {
+ String cluster = commandLine.getOptionValue('c').trim();
+ TopicRouteData topicRouteData =
defaultMQAdminExt.examineTopicRouteInfo(cluster);
+
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ String addr = bd.selectBrokerAddr();
+ if (addr != null) {
+ TopicStatsTable tst =
defaultMQAdminExt.examineTopicStats(addr, topic);
+
topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
+ }
+ }
+ } else {
+ topicStatsTable = defaultMQAdminExt.examineTopicStats(topic);
+ }
List<MessageQueue> mqList = new LinkedList<>();
mqList.addAll(topicStatsTable.getOffsetTable().keySet());
diff --git
a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
index fc5405e747..b24bd22db8 100644
---
a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
+++
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
@@ -127,7 +127,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
when(mQAdminImpl.queryMessageByUniqKey(anyString(),
anyString())).thenReturn(retMsgExt);
QueryResult queryResult = new QueryResult(0,
Lists.newArrayList(retMsgExt));
- when(defaultMQAdminExtImpl.queryMessageByUniqKey(anyString(),
anyString(), anyInt(), anyLong(), anyLong())).thenReturn(queryResult);
+ when(mQAdminImpl.queryMessageByUniqKey(anyString(), anyString(),
anyString(), anyInt(), anyLong(), anyLong())).thenReturn(queryResult);
TopicRouteData topicRouteData = new TopicRouteData();
List<BrokerData> brokerDataList = new ArrayList<>();
@@ -194,7 +194,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] args = new String[] {"-t myTopicTest", "-i msgId"};
+ String[] args = new String[] {"-t myTopicTest", "-i msgId", "-c
DefaultCluster"};
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args,
cmd.buildCommandlineOptions(options), new DefaultParser());
cmd.execute(commandLine, options, null);
@@ -218,7 +218,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] args = new String[] {"-t myTopicTest", "-i
7F000001000004D20000000000000066"};
+ String[] args = new String[] {"-t myTopicTest", "-i
7F000001000004D20000000000000066", "-c DefaultCluster"};
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args,
cmd.buildCommandlineOptions(options), new DefaultParser());
cmd.execute(commandLine, options, null);
@@ -230,7 +230,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] args = new String[] {"-t myTopicTest", "-i
0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"};
+ String[] args = new String[] {"-t myTopicTest", "-i
0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId", "-c
DefaultCluster"};
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args,
cmd.buildCommandlineOptions(options), new DefaultParser());
cmd.execute(commandLine, options, null);
@@ -241,13 +241,13 @@ public class QueryMsgByUniqueKeySubCommandTest {
System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876");
- String[] args = new String[]{"-t myTopicTest", "-i
0A3A54F7BF7D18B4AAC28A3FA2CF0000"};
+ String[] args = new String[]{"-t myTopicTest", "-i
0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-c DefaultCluster"};
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args,
cmd.buildCommandlineOptions(options), new DefaultParser());
cmd.execute(commandLine, options, null);
- args = new String[] {"-t myTopicTest", "-i
0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"};
+ args = new String[] {"-t myTopicTest", "-i
0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId", "-c
DefaultCluster"};
commandLine = ServerUtil.parseCmdLine("mqadmin ", args,
cmd.buildCommandlineOptions(options),
new DefaultParser());
cmd.execute(commandLine, options, null);