This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git
The following commit(s) were added to refs/heads/master by this push:
new bbabd1c [ISSUES #281 #274 #285 #287] Speeds up topic and consumer
queries, adds caching, and fixes delay/dead-letter topic mix-up (#286)
bbabd1c is described below
commit bbabd1cd0df299c8c59d44344ffe4c369c6eef13
Author: Xu Yichi <[email protected]>
AuthorDate: Sun Apr 13 19:41:24 2025 +0800
[ISSUES #281 #274 #285 #287] Speeds up topic and consumer queries, adds
caching, and fixes delay/dead-letter topic mix-up (#286)
* fix: Resolved issue of query failure under a large number of topics and
consumers apache#281
* fix: Expand the message ID query time range to avoid query failure
* fix: Remove duplicates from topic queries, increase system topic
recognition #287
---
.../dashboard/controller/ConsumerController.java | 7 +
.../dashboard/controller/TopicController.java | 6 +
.../rocketmq/dashboard/model/GroupConsumeInfo.java | 10 +
.../dashboard/service/ClusterInfoService.java | 72 +++++++
.../dashboard/service/ConsumerService.java | 8 +-
.../rocketmq/dashboard/service/TopicService.java | 1 +
.../dashboard/service/client/MQAdminExtImpl.java | 32 +++-
.../service/impl/ConsumerServiceImpl.java | 206 +++++++++++++--------
.../dashboard/service/impl/TopicServiceImpl.java | 142 ++++++++++----
.../dashboard/task/DashboardCollectTask.java | 11 ++
src/main/resources/static/src/consumer.js | 30 ++-
src/main/resources/static/src/i18n/en.js | 1 +
src/main/resources/static/src/i18n/zh.js | 3 +-
src/main/resources/static/src/message.js | 2 +-
src/main/resources/static/src/topic.js | 43 ++++-
src/main/resources/static/view/pages/consumer.html | 10 +-
16 files changed, 444 insertions(+), 140 deletions(-)
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
index 96fc056..cf4a210 100644
---
a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
+++
b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
@@ -51,6 +51,13 @@ public class ConsumerController {
return consumerService.queryGroupList(skipSysGroup, address);
}
+ @RequestMapping(value = "/group.refresh")
+ @ResponseBody
+ public Object refresh(String address,
+ String consumerGroup) {
+ return consumerService.refreshGroup(address, consumerGroup);
+ }
+
@RequestMapping(value = "/group.query")
@ResponseBody
public Object groupQuery(@RequestParam String consumerGroup, String
address) {
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java
b/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java
index 467c18e..665a80a 100644
---
a/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java
+++
b/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java
@@ -56,6 +56,12 @@ public class TopicController {
return topicService.fetchAllTopicList(skipSysProcess, skipRetryAndDlq);
}
+ @RequestMapping(value = "/refresh", method = {RequestMethod.POST})
+ @ResponseBody
+ public Object refresh() {
+ return topicService.refreshTopicList();
+ }
+
@RequestMapping(value = "/list.queryTopicType", method = RequestMethod.GET)
@ResponseBody
public Object listTopicType() {
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
index db11c41..358d02e 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.dashboard.model;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import java.util.Date;
import java.util.List;
public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
@@ -31,6 +32,7 @@ public class GroupConsumeInfo implements
Comparable<GroupConsumeInfo> {
private int consumeTps;
private long diffTotal = -1;
private String subGroupType = "NORMAL";
+ private Date updateTime;
public String getGroup() {
@@ -112,4 +114,12 @@ public class GroupConsumeInfo implements
Comparable<GroupConsumeInfo> {
public void setVersion(String version) {
this.version = version;
}
+
+ public Date getUpdateTime() {
+ return updateTime;
+ }
+
+ public void setUpdateTime(Date updateTime) {
+ this.updateTime = updateTime;
+ }
}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/ClusterInfoService.java
b/src/main/java/org/apache/rocketmq/dashboard/service/ClusterInfoService.java
new file mode 100644
index 0000000..3dc12b3
--- /dev/null
+++
b/src/main/java/org/apache/rocketmq/dashboard/service/ClusterInfoService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Slf4j
+@Service
+public class ClusterInfoService {
+
+ @Resource
+ private MQAdminExt mqAdminExt;
+
+ @Value("${rocketmq.cluster.cache.expire:60000}")
+ private long cacheExpireMs;
+
+
+ private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
+ private final AtomicReference<ClusterInfo> cachedRef = new
AtomicReference<>();
+
+
+ @PostConstruct
+ public void init() {
+ scheduler.scheduleAtFixedRate(this::refresh,
+ 0, cacheExpireMs / 2, TimeUnit.MILLISECONDS);
+ }
+
+ public ClusterInfo get() {
+ ClusterInfo info = cachedRef.get();
+ return info != null ? info : refresh();
+ }
+
+ public synchronized ClusterInfo refresh() {
+ try {
+ ClusterInfo fresh = mqAdminExt.examineBrokerClusterInfo();
+ cachedRef.set(fresh);
+ return fresh;
+ } catch (Exception e) {
+ log.warn("Refresh cluster info failed", e);
+ ClusterInfo old = cachedRef.get();
+ if (old != null) {
+ return old;
+ }
+ throw new IllegalStateException("No cluster info available", e);
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
index e284c44..001a184 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
@@ -17,14 +17,14 @@
package org.apache.rocketmq.dashboard.service;
+import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
+import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
+import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
-import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
-import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
-import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
import java.util.List;
import java.util.Map;
@@ -55,4 +55,6 @@ public interface ConsumerService {
ConsumerConnection getConsumerConnection(String consumerGroup, String
address);
ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String
clientId, boolean jstack);
+
+ Object refreshGroup(String address, String consumerGroup);
}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java
b/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java
index 9ff0bf0..b0f4814 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java
@@ -54,4 +54,5 @@ public interface TopicService {
SendResult sendTopicMessageRequest(SendTopicMessageRequest
sendTopicMessageRequest);
+ boolean refreshTopicList();
}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
index 83143c3..146f9e5 100644
---
a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
+++
b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
@@ -17,11 +17,15 @@
package org.apache.rocketmq.dashboard.service.client;
import com.google.common.base.Throwables;
+
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -87,9 +91,15 @@ import static
org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
public class MQAdminExtImpl implements MQAdminExt {
private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class);
- public MQAdminExtImpl() {
+ private static final ConcurrentMap<String, TopicConfigSerializeWrapper>
TOPIC_CONFIG_CACHE = new ConcurrentHashMap<>();
+
+ public MQAdminExtImpl() {}
+
+ public static void clearTopicConfigCache() {
+ TOPIC_CONFIG_CACHE.clear();
}
+
@Override
public void updateBrokerConfig(String brokerAddr, Properties properties)
throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException,
@@ -145,7 +155,7 @@ public class MQAdminExtImpl implements MQAdminExt {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
null);
RemotingCommand response = null;
try {
- response = remotingClient.invokeSync(addr, request, 3000);
+ response = remotingClient.invokeSync(addr, request, 8000);
}
catch (Exception err) {
Throwables.throwIfUnchecked(err);
@@ -164,19 +174,27 @@ public class MQAdminExtImpl implements MQAdminExt {
@Override
public TopicConfig examineTopicConfig(String addr, String topic) throws
MQBrokerException {
+ TopicConfigSerializeWrapper cachedWrapper =
TOPIC_CONFIG_CACHE.get(addr);
+
+ if (cachedWrapper != null &&
cachedWrapper.getTopicConfigTable().containsKey(topic)) {
+ return cachedWrapper.getTopicConfigTable().get(topic);
+ }
+
RemotingClient remotingClient =
MQAdminInstance.threadLocalRemotingClient();
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
RemotingCommand response = null;
try {
response = remotingClient.invokeSync(addr, request, 3000);
- }
- catch (Exception err) {
+ } catch (Exception err) {
Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
}
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
- TopicConfigSerializeWrapper topicConfigSerializeWrapper =
decode(response.getBody(), TopicConfigSerializeWrapper.class);
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper =
+ decode(response.getBody(),
TopicConfigSerializeWrapper.class);
+
+ TOPIC_CONFIG_CACHE.put(addr, topicConfigSerializeWrapper);
return
topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
}
default:
@@ -468,14 +486,14 @@ public class MQAdminExtImpl implements MQAdminExt {
Set<String> clusterList =
MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
if (clusterList == null || clusterList.isEmpty()) {
QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", "",
topic, msgId, 32,
-
MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE,
true).get();
+ 0L, Long.MAX_VALUE, true).get();
if (qr != null && qr.getMessageList() != null &&
!qr.getMessageList().isEmpty()) {
return qr.getMessageList().get(0);
}
} else {
for (String name : clusterList) {
QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage",
name, topic, msgId, 32,
-
MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE,
true).get();
+ 0L, Long.MAX_VALUE, true).get();
if (qr != null && qr.getMessageList() != null &&
!qr.getMessageList().isEmpty()) {
return qr.getMessageList().get(0);
}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
index 9bc37ab..2f81582 100644
---
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
+++
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
@@ -23,14 +23,18 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -41,11 +45,16 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.Resource;
+
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
+import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
+import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
+import org.apache.rocketmq.dashboard.service.ClusterInfoService;
import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
@@ -65,15 +74,13 @@ import
org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
import org.apache.rocketmq.dashboard.model.QueueStatInfo;
import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
-import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
-import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
-import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import org.apache.rocketmq.dashboard.service.ConsumerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@@ -85,10 +92,17 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
@Resource
private RMQConfigure configure;
+ @Autowired
+ private ClusterInfoService clusterInfoService;
+
+ private volatile boolean isCacheBeingBuilt = false;
+
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
private ExecutorService executorService;
+ private final List<GroupConsumeInfo> cacheConsumeInfoList =
Collections.synchronizedList(new ArrayList<>());
+
@Override
public void afterPropertiesSet() {
Runtime runtime = Runtime.getRuntime();
@@ -104,7 +118,7 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
};
RejectedExecutionHandler handler = new
ThreadPoolExecutor.DiscardOldestPolicy();
this.executorService = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize, 60L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(5000), threadFactory, handler);
+ new LinkedBlockingQueue<>(5000), threadFactory, handler);
}
@Override
@@ -125,11 +139,47 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
@Override
public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup, String
address) {
+ if (isCacheBeingBuilt) {
+ throw new RuntimeException("Cache is being built, please try again
later");
+ }
+
+ synchronized (this) {
+ if (cacheConsumeInfoList.isEmpty() && !isCacheBeingBuilt) {
+ isCacheBeingBuilt = true;
+ try {
+ makeGroupListCache();
+ } finally {
+ isCacheBeingBuilt = false;
+ }
+ }
+ }
+
+ if (cacheConsumeInfoList.isEmpty()) {
+ throw new RuntimeException("No consumer group information
available");
+ }
+
+ List<GroupConsumeInfo> groupConsumeInfoList = new
ArrayList<>(cacheConsumeInfoList);
+
+ if (!skipSysGroup) {
+ groupConsumeInfoList.stream().map(group -> {
+ if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
+ group.setGroup(String.format("%s%s", "%SYS%",
group.getGroup()));
+ }
+ return group;
+ }).collect(Collectors.toList());
+ }
+ Collections.sort(groupConsumeInfoList);
+ return groupConsumeInfoList;
+ }
+
+
+ public void makeGroupListCache() {
HashMap<String, List<String>> consumerGroupMap = Maps.newHashMap();
+ SubscriptionGroupWrapper subscriptionGroupWrapper = null;
try {
- ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+ ClusterInfo clusterInfo = clusterInfoService.get();
for (BrokerData brokerData :
clusterInfo.getBrokerAddrTable().values()) {
- SubscriptionGroupWrapper subscriptionGroupWrapper =
mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
+ subscriptionGroupWrapper =
mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
for (String groupName :
subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) {
if (!consumerGroupMap.containsKey(groupName)) {
consumerGroupMap.putIfAbsent(groupName, new
ArrayList<>());
@@ -143,14 +193,28 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
}
+
+ if (subscriptionGroupWrapper != null &&
subscriptionGroupWrapper.getSubscriptionGroupTable().isEmpty()) {
+ logger.warn("No subscription group information available");
+ isCacheBeingBuilt = false;
+ return;
+ }
+ final ConcurrentMap<String, SubscriptionGroupConfig>
subscriptionGroupTable = subscriptionGroupWrapper.getSubscriptionGroupTable();
List<GroupConsumeInfo> groupConsumeInfoList =
Collections.synchronizedList(Lists.newArrayList());
CountDownLatch countDownLatch = new
CountDownLatch(consumerGroupMap.size());
for (Map.Entry<String, List<String>> entry :
consumerGroupMap.entrySet()) {
String consumerGroup = entry.getKey();
executorService.submit(() -> {
try {
- GroupConsumeInfo consumeInfo = queryGroup(consumerGroup,
address);
+ GroupConsumeInfo consumeInfo = queryGroup(consumerGroup,
"");
consumeInfo.setAddress(entry.getValue());
+ if (SYSTEM_GROUP_SET.contains(consumerGroup)) {
+ consumeInfo.setSubGroupType("SYSTEM");
+ } else {
+
consumeInfo.setSubGroupType(subscriptionGroupTable.get(consumerGroup).isConsumeMessageOrderly()
? "FIFO" : "NORMAL");
+ }
+ consumeInfo.setGroup(consumerGroup);
+ consumeInfo.setUpdateTime(new Date());
groupConsumeInfoList.add(consumeInfo);
} catch (Exception e) {
logger.error("queryGroup exception, consumerGroup: {}",
consumerGroup, e);
@@ -160,21 +224,17 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
});
}
try {
- countDownLatch.await(30, TimeUnit.SECONDS);
+ countDownLatch.await();
} catch (InterruptedException e) {
- logger.error("query consumerGroup countDownLatch await Exception",
e);
- }
-
- if (!skipSysGroup) {
- groupConsumeInfoList.stream().map(group -> {
- if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
- group.setGroup(String.format("%s%s", "%SYS%",
group.getGroup()));
- }
- return group;
- }).collect(Collectors.toList());
+ Thread.currentThread().interrupt();
+ logger.error("Interruption occurred while waiting for task
completion", e);
}
+ logger.info("All consumer group query tasks have been completed");
+ isCacheBeingBuilt = false;
Collections.sort(groupConsumeInfoList);
- return groupConsumeInfoList;
+
+ cacheConsumeInfoList.clear();
+ cacheConsumeInfoList.addAll(groupConsumeInfoList);
}
@Override
@@ -184,16 +244,14 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
ConsumeStats consumeStats = null;
try {
consumeStats = mqAdminExt.examineConsumeStats(consumerGroup);
- }
- catch (Exception e) {
+ } catch (Exception e) {
logger.warn("examineConsumeStats exception to consumerGroup
{}, response [{}]", consumerGroup, e.getMessage());
}
-
+ if (consumeStats != null) {
+ groupConsumeInfo.setConsumeTps((int)
consumeStats.getConsumeTps());
+ groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
+ }
ConsumerConnection consumerConnection = null;
- boolean isFifoType = examineSubscriptionGroupConfig(consumerGroup)
-
.stream().map(ConsumerConfigInfo::getSubscriptionGroupConfig)
-
.allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly);
-
try {
if (StringUtils.isNotEmpty(address)) {
consumerConnection =
proxyAdmin.examineConsumerConnectionInfo(address, consumerGroup);
@@ -203,31 +261,15 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
} catch (Exception e) {
logger.warn("examineConsumeStats exception to consumerGroup
{}, response [{}]", consumerGroup, e.getMessage());
}
-
- groupConsumeInfo.setGroup(consumerGroup);
- if (SYSTEM_GROUP_SET.contains(consumerGroup)) {
- groupConsumeInfo.setSubGroupType("SYSTEM");
- } else if (isFifoType) {
- groupConsumeInfo.setSubGroupType("FIFO");
- } else {
- groupConsumeInfo.setSubGroupType("NORMAL");
- }
-
- if (consumeStats != null) {
-
groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps());
- groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
- }
-
if (consumerConnection != null) {
groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size());
groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel());
groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType());
groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion()));
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
logger.warn("examineConsumeStats or examineConsumerConnectionInfo
exception, "
- + consumerGroup, e);
+ + consumerGroup, e);
}
return groupConsumeInfo;
}
@@ -252,8 +294,7 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
ConsumeStats consumeStats = null;
try {
consumeStats = mqAdminExt.examineConsumeStats(groupName, topic);
- }
- catch (Exception e) {
+ } catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
@@ -295,8 +336,7 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
results.put(messageQueue, clinetId);
}
}
- }
- catch (Exception err) {
+ } catch (Exception err) {
logger.error("op=getClientConnection_error", err);
}
return results;
@@ -311,14 +351,12 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
List<TopicConsumerInfo> topicConsumerInfoList = null;
try {
topicConsumerInfoList = queryConsumeStatsList(topic,
group);
- }
- catch (Exception ignore) {
+ } catch (Exception ignore) {
}
group2ConsumerInfoMap.put(group,
CollectionUtils.isEmpty(topicConsumerInfoList) ? new TopicConsumerInfo(topic) :
topicConsumerInfoList.get(0));
}
return group2ConsumerInfoMap;
- }
- catch (Exception e) {
+ } catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
@@ -330,7 +368,7 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
for (String consumerGroup : resetOffsetRequest.getConsumerGroupList())
{
try {
Map<MessageQueue, Long> rollbackStatsMap =
-
mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), consumerGroup,
resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce());
+
mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), consumerGroup,
resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce());
ConsumerGroupRollBackStat consumerGroupRollBackStat = new
ConsumerGroupRollBackStat(true);
List<RollbackStats> rollbackStatsList =
consumerGroupRollBackStat.getRollbackStatsList();
for (Map.Entry<MessageQueue, Long> rollbackStatsEntty :
rollbackStatsMap.entrySet()) {
@@ -341,8 +379,7 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
rollbackStatsList.add(rollbackStats);
}
groupRollbackStats.put(consumerGroup,
consumerGroupRollBackStat);
- }
- catch (MQClientException e) {
+ } catch (MQClientException e) {
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
try {
ConsumerGroupRollBackStat consumerGroupRollBackStat =
new ConsumerGroupRollBackStat(true);
@@ -350,17 +387,14 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
consumerGroupRollBackStat.setRollbackStatsList(rollbackStatsList);
groupRollbackStats.put(consumerGroup,
consumerGroupRollBackStat);
continue;
- }
- catch (Exception err) {
+ } catch (Exception err) {
logger.error("op=resetOffset_which_not_online_error",
err);
}
- }
- else {
+ } else {
logger.error("op=resetOffset_error", e);
}
groupRollbackStats.put(consumerGroup, new
ConsumerGroupRollBackStat(false, e.getMessage()));
- }
- catch (Exception e) {
+ } catch (Exception e) {
logger.error("op=resetOffset_error", e);
groupRollbackStats.put(consumerGroup, new
ConsumerGroupRollBackStat(false, e.getMessage()));
}
@@ -372,17 +406,21 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String
group) {
List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList();
try {
- ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+ ClusterInfo clusterInfo = clusterInfoService.get();
for (String brokerName :
clusterInfo.getBrokerAddrTable().keySet()) { //foreach brokerName
String brokerAddress =
clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr();
- SubscriptionGroupConfig subscriptionGroupConfig =
mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group);
+ SubscriptionGroupConfig subscriptionGroupConfig = null;
+ try {
+ subscriptionGroupConfig =
mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group);
+ } catch (Exception e) {
+ logger.warn("op=examineSubscriptionGroupConfig_error
brokerName={} group={}", brokerName, group);
+ }
if (subscriptionGroupConfig == null) {
continue;
}
consumerConfigInfoList.add(new
ConsumerConfigInfo(Lists.newArrayList(brokerName), subscriptionGroupConfig));
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
@@ -399,7 +437,7 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
deleteInNsFlag = true;
}
try {
- ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+ ClusterInfo clusterInfo = clusterInfoService.get();
for (String brokerName :
deleteSubGroupRequest.getBrokerNameList()) {
logger.info("addr={} groupName={}",
clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
deleteSubGroupRequest.getGroupName());
mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
deleteSubGroupRequest.getGroupName(), true);
@@ -407,8 +445,7 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
deleteResources(MixAll.RETRY_GROUP_TOPIC_PREFIX +
deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag);
deleteResources(MixAll.DLQ_GROUP_TOPIC_PREFIX +
deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag);
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
@@ -430,13 +467,12 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
@Override
public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo
consumerConfigInfo) {
try {
- ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+ ClusterInfo clusterInfo = clusterInfoService.get();
for (String brokerName :
changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
- consumerConfigInfo.getClusterNameList(),
consumerConfigInfo.getBrokerNameList())) {
+ consumerConfigInfo.getClusterNameList(),
consumerConfigInfo.getBrokerNameList())) {
mqAdminExt.createAndUpdateSubscriptionGroupConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
consumerConfigInfo.getSubscriptionGroupConfig());
}
- }
- catch (Exception err) {
+ } catch (Exception err) {
Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
}
@@ -451,8 +487,7 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
for (ConsumerConfigInfo consumerConfigInfo :
consumerConfigInfoList) {
brokerNameSet.addAll(consumerConfigInfo.getBrokerNameList());
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
@@ -476,10 +511,29 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup,
String clientId, boolean jstack) {
try {
return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId,
jstack);
- }
- catch (Exception e) {
+ } catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
+
+ @Override
+ public GroupConsumeInfo refreshGroup(String address, String consumerGroup)
{
+
+ if (isCacheBeingBuilt || cacheConsumeInfoList.isEmpty()) {
+ throw new RuntimeException("Cache is being built or empty, please
try again later");
+ }
+ synchronized (cacheConsumeInfoList) {
+ for (int i = 0; i < cacheConsumeInfoList.size(); i++) {
+ GroupConsumeInfo groupConsumeInfo =
cacheConsumeInfoList.get(i);
+ if (groupConsumeInfo.getGroup().equals(consumerGroup)) {
+ GroupConsumeInfo updatedInfo = queryGroup(consumerGroup,
"");
+ updatedInfo.setUpdateTime(new Date());
+ cacheConsumeInfoList.set(i, updatedInfo);
+ return updatedInfo;
+ }
+ }
+ }
+ throw new RuntimeException("No consumer group information available");
+ }
}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
index 4f34fc6..14c00a2 100644
---
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
+++
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
@@ -43,28 +43,35 @@ import
org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
import org.apache.rocketmq.dashboard.model.request.TopicTypeList;
import org.apache.rocketmq.dashboard.model.request.TopicTypeMeta;
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
+import org.apache.rocketmq.dashboard.service.ClusterInfoService;
import org.apache.rocketmq.dashboard.service.TopicService;
+import org.apache.rocketmq.dashboard.service.client.MQAdminExtImpl;
+import org.apache.rocketmq.dashboard.support.GlobalExceptionHandler;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
+import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.joor.Reflect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -73,9 +80,19 @@ import static
org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTR
@Service
public class TopicServiceImpl extends AbstractCommonService implements
TopicService {
+ private Logger logger =
LoggerFactory.getLogger(GlobalExceptionHandler.class);
+
+ @Autowired
+ private ClusterInfoService clusterInfoService;
+
+ private final ConcurrentMap<String, TopicRouteData> routeCache = new
ConcurrentHashMap<>();
+ private final Object cacheLock = new Object();
+
@Autowired
private RMQConfigure configure;
+ private final ConcurrentMap<String, TopicConfig> topicConfigCache = new
ConcurrentHashMap<>();
+
@Override
public TopicList fetchAllTopicList(boolean skipSysProcess, boolean
skipRetryAndDlq) {
try {
@@ -105,37 +122,63 @@ public class TopicServiceImpl extends
AbstractCommonService implements TopicServ
@Override
public TopicTypeList examineAllTopicType() {
- ArrayList<TopicTypeMeta> topicTypes = new ArrayList<>();
- ArrayList<String> names = new ArrayList<>();
- ArrayList<String> messageTypes = new ArrayList<>();
- TopicList topicList = fetchAllTopicList(false, false);
- checkTopicType(topicList, topicTypes);
- topicTypes.sort((t1, t2) ->
t1.getTopicName().compareTo(t2.getTopicName()));
- for (TopicTypeMeta topicTypeMeta : topicTypes) {
- names.add(topicTypeMeta.getTopicName());
- messageTypes.add(topicTypeMeta.getMessageType());
- }
+ List<String> messageTypes = new ArrayList<>();
+ List<String> names = new ArrayList<>();
+ ClusterInfo clusterInfo = clusterInfoService.get();
+ TopicList sysTopics = getSystemTopicList();
+ clusterInfo.getBrokerAddrTable().values().forEach(brokerAddr -> {
+ try {
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper =
mqAdminExt.getAllTopicConfig(brokerAddr.getBrokerAddrs().get(0L), 10000L);
+ for (TopicConfig topicConfig :
topicConfigSerializeWrapper.getTopicConfigTable().values()) {
+ TopicTypeMeta topicType =
classifyTopicType(topicConfig.getTopicName(),
topicConfigSerializeWrapper.getTopicConfigTable().get(topicConfig.getTopicName()).getAttributes(),sysTopics.getTopicList());
+ if (names.contains(topicType.getTopicName())) {
+ continue;
+ }
+ names.add(topicType.getTopicName());
+ messageTypes.add(topicType.getMessageType());
+ }
+ } catch (Exception e) {
+ logger.warn("Failed to classify topic type for broker: " +
brokerAddr, e);
+ }
+ });
+ sysTopics.getTopicList().forEach(topicName -> {
+ String sysTopicName = String.format("%s%s", "%SYS%", topicName);
+ if (!names.contains(sysTopicName)) {
+ names.add(sysTopicName);
+ messageTypes.add("SYSTEM");
+ }
+ });
+
return new TopicTypeList(names, messageTypes);
}
- private void checkTopicType(TopicList topicList, ArrayList<TopicTypeMeta>
topicTypes) {
- for (String topicName : topicList.getTopicList()) {
- TopicTypeMeta topicType = new TopicTypeMeta();
- topicType.setTopicName(topicName);
- if (topicName.startsWith("%R")) {
- topicType.setMessageType("RETRY");
- } else if (topicName.startsWith("%D")) {
- topicType.setMessageType("DELAY");
- } else if (topicName.startsWith("%S")) {
- topicType.setMessageType("SYSTEM");
- } else {
- List<TopicConfigInfo> topicConfigInfos =
examineTopicConfig(topicName);
- if (!CollectionUtils.isEmpty(topicConfigInfos)) {
-
topicType.setMessageType(topicConfigInfos.get(0).getMessageType());
- }
- }
- topicTypes.add(topicType);
+ private TopicTypeMeta classifyTopicType(String topicName,
Map<String,String> attributes, Set<String> sysTopics) {
+ TopicTypeMeta topicType = new TopicTypeMeta();
+ topicType.setTopicName(topicName);
+
+ if (topicName.startsWith("%R")) {
+ topicType.setMessageType("RETRY");
+ return topicType;
+ } else if (topicName.startsWith("%D")) {
+ topicType.setMessageType("DLQ");
+ return topicType;
+ } else if (sysTopics.contains(topicName) ||
topicName.startsWith("rmq_sys") ||
topicName.equals("DefaultHeartBeatSyncerTopic")) {
+ topicType.setMessageType("SYSTEM");
+ topicType.setTopicName(String.format("%s%s", "%SYS%", topicName));
+ return topicType;
+ }
+ if (attributes == null || attributes.isEmpty()) {
+ topicType.setMessageType("UNSPECIFIED");
+ return topicType;
+ }
+
+ String messageType =
attributes.get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName());
+ if (StringUtils.isBlank(messageType)) {
+ messageType = TopicMessageType.UNSPECIFIED.name();
}
+ topicType.setMessageType(messageType);
+
+ return topicType;
}
@Override
@@ -150,11 +193,24 @@ public class TopicServiceImpl extends
AbstractCommonService implements TopicServ
@Override
public TopicRouteData route(String topic) {
- try {
- return mqAdminExt.examineTopicRouteInfo(topic);
- } catch (Exception ex) {
- Throwables.throwIfUnchecked(ex);
- throw new RuntimeException(ex);
+ TopicRouteData cachedData = routeCache.get(topic);
+ if (cachedData != null) {
+ return cachedData;
+ }
+
+ synchronized (cacheLock) {
+ cachedData = routeCache.get(topic);
+ if (cachedData != null) {
+ return cachedData;
+ }
+ try {
+ TopicRouteData freshData =
mqAdminExt.examineTopicRouteInfo(topic);
+ routeCache.put(topic, freshData);
+ return freshData;
+ } catch (Exception ex) {
+ Throwables.throwIfUnchecked(ex);
+ throw new RuntimeException(ex);
+ }
}
}
@@ -170,6 +226,7 @@ public class TopicServiceImpl extends AbstractCommonService
implements TopicServ
@Override
public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
+ MQAdminExtImpl.clearTopicConfigCache();
TopicConfig topicConfig = new TopicConfig();
BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
String messageType = topicCreateOrUpdateRequest.getMessageType();
@@ -189,12 +246,15 @@ public class TopicServiceImpl extends
AbstractCommonService implements TopicServ
}
}
- @Override
public TopicConfig examineTopicConfig(String topic, String brokerName) {
- ClusterInfo clusterInfo = null;
try {
- clusterInfo = mqAdminExt.examineBrokerClusterInfo();
- return
mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
topic);
+ ClusterInfo clusterInfo = clusterInfoService.get();
+
+ BrokerData brokerData =
clusterInfo.getBrokerAddrTable().get(brokerName);
+ if (brokerData == null) {
+ throw new RuntimeException("Broker not found: " + brokerName);
+ }
+ return
mqAdminExt.examineTopicConfig(brokerData.selectBrokerAddr(), topic);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
@@ -371,6 +431,14 @@ public class TopicServiceImpl extends
AbstractCommonService implements TopicServ
}
+ @Override
+ public boolean refreshTopicList() {
+ routeCache.clear();
+ clusterInfoService.refresh();
+ MQAdminExtImpl.clearTopicConfigCache();
+ return true;
+ }
+
private void waitSendTraceFinish(DefaultMQProducer producer, boolean
traceEnabled) {
if (!traceEnabled) {
return;
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java
b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java
index d58668b..c9a870c 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java
@@ -34,6 +34,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import javax.annotation.Resource;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.dashboard.service.ConsumerService;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
@@ -59,6 +60,9 @@ public class DashboardCollectTask {
@Resource
private DashboardCollectService dashboardCollectService;
+ @Resource
+ private ConsumerService consumerService;
+
private final static Logger log =
LoggerFactory.getLogger(DashboardCollectTask.class);
@Resource
@@ -89,6 +93,13 @@ public class DashboardCollectTask {
}
}
+ @Scheduled(cron = "0 0 2 * * ?")
+ public void collectConsumer() {
+ consumerService.queryGroupList(false, null);
+ }
+
+
+
@Scheduled(cron = "0 0/1 * * * ?")
public void collectBroker() {
if (!rmqConfigure.isEnableDashBoardCollect()) {
diff --git a/src/main/resources/static/src/consumer.js
b/src/main/resources/static/src/consumer.js
index 8cf845b..8d5f2c7 100644
--- a/src/main/resources/static/src/consumer.js
+++ b/src/main/resources/static/src/consumer.js
@@ -70,6 +70,34 @@ module.controller('consumerController', ['$scope',
'ngDialog', '$http', 'Notific
}
$scope.filterList($scope.paginationConf.currentPage)
};
+ $scope.refreshConsumerGroup = function (groupName) {
+ //Show loader
+ $('#loaderConsumer').removeClass("hide-myloader");
+
+ $http({
+ method: "GET",
+ url: "/consumer/group.refresh",
+ params: {
+ address: $scope.isRmqVersionV5() ?
localStorage.getItem('proxyAddr') : null,
+ consumerGroup: groupName
+ }
+ }).success(function (resp) {
+ if (resp.status == 0) {
+ for (var i = 0; i < $scope.allConsumerGrouopList.length; i++) {
+ if ($scope.allConsumerGrouopList[i].group === groupName) {
+ $scope.allConsumerGrouopList[i] = resp.data;
+ break;
+ }
+ }
+
$scope.showConsumerGroupList($scope.paginationConf.currentPage,
$scope.allConsumerGrouopList.length);
+ //Hide loader
+ $('#loaderConsumer').addClass("hide-myloader");
+ } else {
+ Notification.error({message: resp.errMsg, delay: 2000});
+ }
+ });
+ }
+
$scope.refreshConsumerData = function () {
//Show loader
$('#loaderConsumer').removeClass("hide-myloader");
@@ -421,4 +449,4 @@ module.controller('consumerTopicViewDialogController',
['$scope', 'ngDialog', '$
});
};
}]
-);
\ No newline at end of file
+);
diff --git a/src/main/resources/static/src/i18n/en.js
b/src/main/resources/static/src/i18n/en.js
index 2c1450d..7fbb042 100644
--- a/src/main/resources/static/src/i18n/en.js
+++ b/src/main/resources/static/src/i18n/en.js
@@ -135,4 +135,5 @@ var en = {
"MESSAGE_TYPE_FIFO": "FIFO",
"MESSAGE_TYPE_DELAY": "DELAY",
"MESSAGE_TYPE_TRANSACTION": "TRANSACTION",
+ "UPDATE_TIME": "Update Time",
}
diff --git a/src/main/resources/static/src/i18n/zh.js
b/src/main/resources/static/src/i18n/zh.js
index 2f0e6f3..ec2ebdd 100644
--- a/src/main/resources/static/src/i18n/zh.js
+++ b/src/main/resources/static/src/i18n/zh.js
@@ -136,4 +136,5 @@ var zh = {
"MESSAGE_TYPE_FIFO": "顺序消息",
"MESSAGE_TYPE_DELAY": "定时/延时消息",
"MESSAGE_TYPE_TRANSACTION": "事务消息",
-}
\ No newline at end of file
+ "UPDATE_TIME": "更新时间",
+}
diff --git a/src/main/resources/static/src/message.js
b/src/main/resources/static/src/message.js
index a496285..c980d9e 100644
--- a/src/main/resources/static/src/message.js
+++ b/src/main/resources/static/src/message.js
@@ -277,4 +277,4 @@ module.controller('messageDetailViewDialogController',
['$scope', 'ngDialog', '$
$scope.messageTrackShowList = canShowList;
});
}]
-);
\ No newline at end of file
+);
diff --git a/src/main/resources/static/src/topic.js
b/src/main/resources/static/src/topic.js
index 7ad997c..13c3dbb 100644
--- a/src/main/resources/static/src/topic.js
+++ b/src/main/resources/static/src/topic.js
@@ -59,7 +59,7 @@ module.controller('topicController', ['$scope', 'ngDialog',
'$http', 'Notificati
$scope.userRole = $window.sessionStorage.getItem("userrole");
$scope.writeOperationEnabled = $scope.userRole == null ? true :
($scope.userRole == 1 ? true : false);
- $scope.refreshTopicList = function () {
+ $scope.getTopicList = function () {
$http({
method: "GET",
url: "topic/list.queryTopicType"
@@ -77,7 +77,34 @@ module.controller('topicController', ['$scope', 'ngDialog',
'$http', 'Notificati
});
};
- $scope.refreshTopicList();
+ $scope.refreshTopicList = function () {
+ $http({
+ method: "POST",
+ url: "topic/refresh"
+ }).success(function (resp) {
+ if (resp.status == 0 && resp.data == true) {
+ $http({
+ method: "GET",
+ url: "topic/list.queryTopicType"
+ }).success(function (resp1) {
+ if (resp1.status == 0) {
+ $scope.allTopicNameList = resp1.data.topicNameList;
+ $scope.allMessageTypeList = resp1.data.messageTypeList;
+ console.log($scope.allTopicNameList);
+ console.log(JSON.stringify(resp1));
+ $scope.showTopicList(1,
$scope.allTopicNameList.length);
+ } else {
+ Notification.error({message: resp1.errMsg, delay:
5000});
+ }
+ });
+
+ } else {
+ Notification.error({message: resp.errMsg, delay: 5000});
+ }
+ });
+ };
+
+ $scope.getTopicList();
$scope.filterStr = "";
$scope.$watch('filterStr', function () {
@@ -127,17 +154,17 @@ module.controller('topicController', ['$scope',
'ngDialog', '$http', 'Notificati
$scope.filterByType = function (str, type) {
if ($scope.filterRetry) {
- if (str.startsWith("%R")) {
+ if (type.includes("RETRY")) {
return true
}
}
if ($scope.filterDLQ) {
- if (str.startsWith("%D")) {
+ if (type.includes("DLQ")) {
return true
}
}
if ($scope.filterSystem) {
- if (str.startsWith("%S")) {
+ if (type.includes("SYSTEM")) {
return true
}
}
@@ -386,10 +413,6 @@ module.controller('topicController', ['$scope',
'ngDialog', '$http', 'Notificati
if (resp.status == 0) {
console.log(resp);
ngDialog.open({
- preCloseCallback: function (value) {
- // Refresh topic list
- $scope.refreshTopicList();
- },
template: 'topicModifyDialog',
controller: 'topicModifyDialogController',
data: {
@@ -540,4 +563,4 @@ module.controller('routerViewDialogController', ['$scope',
'ngDialog', '$http',
})
};
}]
-);
\ No newline at end of file
+);
diff --git a/src/main/resources/static/view/pages/consumer.html
b/src/main/resources/static/view/pages/consumer.html
index 68c7786..c187f72 100644
--- a/src/main/resources/static/view/pages/consumer.html
+++ b/src/main/resources/static/view/pages/consumer.html
@@ -33,9 +33,6 @@
<button class="btn btn-raised btn-sm btn-primary"
type="button" ng-show="{{writeOperationEnabled}}"
ng-click="openAddDialog()">{{'ADD' | translate}}/
{{'UPDATE' | translate}}
</button>
- <button class="btn btn-raised btn-sm btn-primary"
type="button" ng-click="refreshConsumerData()">
- {{'REFRESH' | translate}}
- </button>
<md-switch class="md-primary" md-no-ink aria-label="Switch No
Ink" ng-model="intervalProcessSwitch">
{{'AUTO_REFRESH' | translate}}
</md-switch>
@@ -53,6 +50,7 @@
<th class="text-center">{{ 'MODE' | translate}}</th>
<th class="text-center"><a
ng-click="sortByKey('consumeTps')">TPS</a></th>
<th class="text-center"><a
ng-click="sortByKey('diffTotal')">{{ 'DELAY' | translate}}</a></th>
+ <th class="text-center">{{ 'UPDATE_TIME' |
translate}}</th>
<th class="text-center">{{ 'OPERATION' |
translate}}</th>
</tr>
<tr ng-repeat="consumerGroup in consumerGroupShowList"
@@ -65,6 +63,7 @@
<td
class="text-center">{{consumerGroup.messageModel}}</td>
<td
class="text-center">{{consumerGroup.consumeTps}}</td>
<td
class="text-center">{{consumerGroup.diffTotal}}</td>
+ <td
class="text-center">{{consumerGroup.updateTime}}</td>
<td class="text-left">
<button name="client"
ng-click="client(consumerGroup.group, consumerGroup.address)"
class="btn btn-raised btn-sm btn-primary"
@@ -85,6 +84,9 @@
ng-show="{{!sysFlag &&
writeOperationEnabled}}"
type="button">{{'DELETE' | translate}}
</button>
+ <button class="btn btn-raised btn-sm btn-primary"
type="button" ng-click="refreshConsumerGroup(consumerGroup.group)">
+ {{'REFRESH' | translate}}
+ </button>
</td>
</tr>
@@ -568,4 +570,4 @@
</div>
</div>
</div>
-</script>
\ No newline at end of file
+</script>