This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git


The following commit(s) were added to refs/heads/master by this push:
     new bbabd1c  [ISSUES #281 #274 #285 #287] Speeds up topic and consumer 
queries, adds caching, and fixes delay/dead-letter topic mix-up (#286)
bbabd1c is described below

commit bbabd1cd0df299c8c59d44344ffe4c369c6eef13
Author: Xu Yichi <[email protected]>
AuthorDate: Sun Apr 13 19:41:24 2025 +0800

    [ISSUES #281 #274 #285 #287] Speeds up topic and consumer queries, adds 
caching, and fixes delay/dead-letter topic mix-up (#286)
    
    * fix: Resolved issue of query failure under a large number of topics and 
consumers apache#281
    
    * fix: Expand the message ID query time range to avoid query failure
    
    * fix: Remove duplicates from topic queries, increase system topic 
recognition #287
---
 .../dashboard/controller/ConsumerController.java   |   7 +
 .../dashboard/controller/TopicController.java      |   6 +
 .../rocketmq/dashboard/model/GroupConsumeInfo.java |  10 +
 .../dashboard/service/ClusterInfoService.java      |  72 +++++++
 .../dashboard/service/ConsumerService.java         |   8 +-
 .../rocketmq/dashboard/service/TopicService.java   |   1 +
 .../dashboard/service/client/MQAdminExtImpl.java   |  32 +++-
 .../service/impl/ConsumerServiceImpl.java          | 206 +++++++++++++--------
 .../dashboard/service/impl/TopicServiceImpl.java   | 142 ++++++++++----
 .../dashboard/task/DashboardCollectTask.java       |  11 ++
 src/main/resources/static/src/consumer.js          |  30 ++-
 src/main/resources/static/src/i18n/en.js           |   1 +
 src/main/resources/static/src/i18n/zh.js           |   3 +-
 src/main/resources/static/src/message.js           |   2 +-
 src/main/resources/static/src/topic.js             |  43 ++++-
 src/main/resources/static/view/pages/consumer.html |  10 +-
 16 files changed, 444 insertions(+), 140 deletions(-)

diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
 
b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
index 96fc056..cf4a210 100644
--- 
a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
@@ -51,6 +51,13 @@ public class ConsumerController {
         return consumerService.queryGroupList(skipSysGroup, address);
     }
 
+    @RequestMapping(value = "/group.refresh")
+    @ResponseBody
+    public Object refresh(String address,
+        String consumerGroup) {
+        return consumerService.refreshGroup(address, consumerGroup);
+    }
+
     @RequestMapping(value = "/group.query")
     @ResponseBody
     public Object groupQuery(@RequestParam String consumerGroup, String 
address) {
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java 
b/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java
index 467c18e..665a80a 100644
--- 
a/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/controller/TopicController.java
@@ -56,6 +56,12 @@ public class TopicController {
         return topicService.fetchAllTopicList(skipSysProcess, skipRetryAndDlq);
     }
 
+    @RequestMapping(value = "/refresh", method = {RequestMethod.POST})
+    @ResponseBody
+    public Object refresh() {
+        return topicService.refreshTopicList();
+    }
+
     @RequestMapping(value = "/list.queryTopicType", method = RequestMethod.GET)
     @ResponseBody
     public Object listTopicType() {
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java 
b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
index db11c41..358d02e 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.dashboard.model;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
 
+import java.util.Date;
 import java.util.List;
 
 public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
@@ -31,6 +32,7 @@ public class GroupConsumeInfo implements 
Comparable<GroupConsumeInfo> {
     private int consumeTps;
     private long diffTotal = -1;
     private String subGroupType = "NORMAL";
+    private Date updateTime;
 
 
     public String getGroup() {
@@ -112,4 +114,12 @@ public class GroupConsumeInfo implements 
Comparable<GroupConsumeInfo> {
     public void setVersion(String version) {
         this.version = version;
     }
+
+    public Date getUpdateTime() {
+        return updateTime;
+    }
+
+    public void setUpdateTime(Date updateTime) {
+        this.updateTime = updateTime;
+    }
 }
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/ClusterInfoService.java 
b/src/main/java/org/apache/rocketmq/dashboard/service/ClusterInfoService.java
new file mode 100644
index 0000000..3dc12b3
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/service/ClusterInfoService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Slf4j
+@Service
+public class ClusterInfoService {
+
+    @Resource
+    private MQAdminExt mqAdminExt;
+
+    @Value("${rocketmq.cluster.cache.expire:60000}")
+    private long cacheExpireMs;
+
+
+    private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+    private final AtomicReference<ClusterInfo> cachedRef = new 
AtomicReference<>();
+
+
+    @PostConstruct
+    public void init() {
+        scheduler.scheduleAtFixedRate(this::refresh,
+                0, cacheExpireMs / 2, TimeUnit.MILLISECONDS);
+    }
+
+    public ClusterInfo get() {
+        ClusterInfo info = cachedRef.get();
+        return info != null ? info : refresh();
+    }
+
+    public synchronized ClusterInfo refresh() {
+        try {
+            ClusterInfo fresh = mqAdminExt.examineBrokerClusterInfo();
+            cachedRef.set(fresh);
+            return fresh;
+        } catch (Exception e) {
+            log.warn("Refresh cluster info failed", e);
+            ClusterInfo old = cachedRef.get();
+            if (old != null) {
+                return old;
+            }
+            throw new IllegalStateException("No cluster info available", e);
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java 
b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
index e284c44..001a184 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
@@ -17,14 +17,14 @@
 
 package org.apache.rocketmq.dashboard.service;
 
+import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
+import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
+import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
 import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
 import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
-import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
-import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
-import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
 
 import java.util.List;
 import java.util.Map;
@@ -55,4 +55,6 @@ public interface ConsumerService {
     ConsumerConnection getConsumerConnection(String consumerGroup, String 
address);
 
     ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String 
clientId, boolean jstack);
+
+    Object refreshGroup(String address, String consumerGroup);
 }
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java 
b/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java
index 9ff0bf0..b0f4814 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/TopicService.java
@@ -54,4 +54,5 @@ public interface TopicService {
 
     SendResult sendTopicMessageRequest(SendTopicMessageRequest 
sendTopicMessageRequest);
 
+    boolean refreshTopicList();
 }
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
 
b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
index 83143c3..146f9e5 100644
--- 
a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
@@ -17,11 +17,15 @@
 package org.apache.rocketmq.dashboard.service.client;
 
 import com.google.common.base.Throwables;
+
 import java.io.UnsupportedEncodingException;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -87,9 +91,15 @@ import static 
org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
 public class MQAdminExtImpl implements MQAdminExt {
     private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class);
 
-    public MQAdminExtImpl() {
+    private static final ConcurrentMap<String, TopicConfigSerializeWrapper> 
TOPIC_CONFIG_CACHE = new ConcurrentHashMap<>();
+
+    public MQAdminExtImpl() {}
+
+    public static void clearTopicConfigCache() {
+        TOPIC_CONFIG_CACHE.clear();
     }
 
+
     @Override
     public void updateBrokerConfig(String brokerAddr, Properties properties)
         throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
@@ -145,7 +155,7 @@ public class MQAdminExtImpl implements MQAdminExt {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
 null);
         RemotingCommand response = null;
         try {
-            response = remotingClient.invokeSync(addr, request, 3000);
+            response = remotingClient.invokeSync(addr, request, 8000);
         }
         catch (Exception err) {
             Throwables.throwIfUnchecked(err);
@@ -164,19 +174,27 @@ public class MQAdminExtImpl implements MQAdminExt {
 
     @Override
     public TopicConfig examineTopicConfig(String addr, String topic) throws 
MQBrokerException {
+        TopicConfigSerializeWrapper cachedWrapper = 
TOPIC_CONFIG_CACHE.get(addr);
+
+        if (cachedWrapper != null && 
cachedWrapper.getTopicConfigTable().containsKey(topic)) {
+            return cachedWrapper.getTopicConfigTable().get(topic);
+        }
+
         RemotingClient remotingClient = 
MQAdminInstance.threadLocalRemotingClient();
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
         RemotingCommand response = null;
         try {
             response = remotingClient.invokeSync(addr, request, 3000);
-        }
-        catch (Exception err) {
+        } catch (Exception err) {
             Throwables.throwIfUnchecked(err);
             throw new RuntimeException(err);
         }
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
-                TopicConfigSerializeWrapper topicConfigSerializeWrapper = 
decode(response.getBody(), TopicConfigSerializeWrapper.class);
+                TopicConfigSerializeWrapper topicConfigSerializeWrapper =
+                        decode(response.getBody(), 
TopicConfigSerializeWrapper.class);
+
+                TOPIC_CONFIG_CACHE.put(addr, topicConfigSerializeWrapper);
                 return 
topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
             }
             default:
@@ -468,14 +486,14 @@ public class MQAdminExtImpl implements MQAdminExt {
         Set<String> clusterList = 
MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
         if (clusterList == null || clusterList.isEmpty()) {
             QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", "", 
topic, msgId, 32,
-                    
MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE, 
true).get();
+                    0L, Long.MAX_VALUE, true).get();
             if (qr != null && qr.getMessageList() != null && 
!qr.getMessageList().isEmpty()) {
                 return qr.getMessageList().get(0);
             }
         } else {
             for (String name : clusterList) {
                 QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", 
name, topic, msgId, 32,
-                        
MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE, 
true).get();
+                        0L, Long.MAX_VALUE, true).get();
                 if (qr != null && qr.getMessageList() != null && 
!qr.getMessageList().isEmpty()) {
                     return qr.getMessageList().get(0);
                 }
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
index 9bc37ab..2f81582 100644
--- 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
@@ -23,14 +23,18 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -41,11 +45,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import javax.annotation.Resource;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
+import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
+import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
+import org.apache.rocketmq.dashboard.service.ClusterInfoService;
 import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
 import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
 import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
@@ -65,15 +74,13 @@ import 
org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
 import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
 import org.apache.rocketmq.dashboard.model.QueueStatInfo;
 import org.apache.rocketmq.dashboard.model.TopicConsumerInfo;
-import org.apache.rocketmq.dashboard.model.request.ConsumerConfigInfo;
-import org.apache.rocketmq.dashboard.model.request.DeleteSubGroupRequest;
-import org.apache.rocketmq.dashboard.model.request.ResetOffsetRequest;
 import org.apache.rocketmq.dashboard.service.AbstractCommonService;
 import org.apache.rocketmq.dashboard.service.ConsumerService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 @Service
@@ -85,10 +92,17 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
     @Resource
     private RMQConfigure configure;
 
+    @Autowired
+    private ClusterInfoService clusterInfoService;
+
+    private volatile boolean isCacheBeingBuilt = false;
+
     private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();
 
     private ExecutorService executorService;
 
+    private final List<GroupConsumeInfo> cacheConsumeInfoList = 
Collections.synchronizedList(new ArrayList<>());
+
     @Override
     public void afterPropertiesSet() {
         Runtime runtime = Runtime.getRuntime();
@@ -104,7 +118,7 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
         };
         RejectedExecutionHandler handler = new 
ThreadPoolExecutor.DiscardOldestPolicy();
         this.executorService = new ThreadPoolExecutor(corePoolSize, 
maximumPoolSize, 60L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(5000), threadFactory, handler);
+                new LinkedBlockingQueue<>(5000), threadFactory, handler);
     }
 
     @Override
@@ -125,11 +139,47 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
 
     @Override
     public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup, String 
address) {
+        if (isCacheBeingBuilt) {
+            throw new RuntimeException("Cache is being built, please try again 
later");
+        }
+
+        synchronized (this) {
+            if (cacheConsumeInfoList.isEmpty() && !isCacheBeingBuilt) {
+                isCacheBeingBuilt = true;
+                try {
+                    makeGroupListCache();
+                } finally {
+                    isCacheBeingBuilt = false;
+                }
+            }
+        }
+
+        if (cacheConsumeInfoList.isEmpty()) {
+            throw new RuntimeException("No consumer group information 
available");
+        }
+
+        List<GroupConsumeInfo> groupConsumeInfoList = new 
ArrayList<>(cacheConsumeInfoList);
+
+        if (!skipSysGroup) {
+            groupConsumeInfoList.stream().map(group -> {
+                if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
+                    group.setGroup(String.format("%s%s", "%SYS%", 
group.getGroup()));
+                }
+                return group;
+            }).collect(Collectors.toList());
+        }
+        Collections.sort(groupConsumeInfoList);
+        return groupConsumeInfoList;
+    }
+
+
+    public void makeGroupListCache() {
         HashMap<String, List<String>> consumerGroupMap = Maps.newHashMap();
+        SubscriptionGroupWrapper subscriptionGroupWrapper = null;
         try {
-            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            ClusterInfo clusterInfo = clusterInfoService.get();
             for (BrokerData brokerData : 
clusterInfo.getBrokerAddrTable().values()) {
-                SubscriptionGroupWrapper subscriptionGroupWrapper = 
mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
+                subscriptionGroupWrapper = 
mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
                 for (String groupName : 
subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) {
                     if (!consumerGroupMap.containsKey(groupName)) {
                         consumerGroupMap.putIfAbsent(groupName, new 
ArrayList<>());
@@ -143,14 +193,28 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
             Throwables.throwIfUnchecked(err);
             throw new RuntimeException(err);
         }
+
+        if (subscriptionGroupWrapper != null && 
subscriptionGroupWrapper.getSubscriptionGroupTable().isEmpty()) {
+            logger.warn("No subscription group information available");
+            isCacheBeingBuilt = false;
+            return;
+        }
+        final ConcurrentMap<String, SubscriptionGroupConfig> 
subscriptionGroupTable = subscriptionGroupWrapper.getSubscriptionGroupTable();
         List<GroupConsumeInfo> groupConsumeInfoList = 
Collections.synchronizedList(Lists.newArrayList());
         CountDownLatch countDownLatch = new 
CountDownLatch(consumerGroupMap.size());
         for (Map.Entry<String, List<String>> entry : 
consumerGroupMap.entrySet()) {
             String consumerGroup = entry.getKey();
             executorService.submit(() -> {
                 try {
-                    GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, 
address);
+                    GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, 
"");
                     consumeInfo.setAddress(entry.getValue());
+                    if (SYSTEM_GROUP_SET.contains(consumerGroup)) {
+                        consumeInfo.setSubGroupType("SYSTEM");
+                    } else {
+                        
consumeInfo.setSubGroupType(subscriptionGroupTable.get(consumerGroup).isConsumeMessageOrderly()
 ? "FIFO" : "NORMAL");
+                    }
+                    consumeInfo.setGroup(consumerGroup);
+                    consumeInfo.setUpdateTime(new Date());
                     groupConsumeInfoList.add(consumeInfo);
                 } catch (Exception e) {
                     logger.error("queryGroup exception, consumerGroup: {}", 
consumerGroup, e);
@@ -160,21 +224,17 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
             });
         }
         try {
-            countDownLatch.await(30, TimeUnit.SECONDS);
+            countDownLatch.await();
         } catch (InterruptedException e) {
-            logger.error("query consumerGroup countDownLatch await Exception", 
e);
-        }
-
-        if (!skipSysGroup) {
-            groupConsumeInfoList.stream().map(group -> {
-                if (SYSTEM_GROUP_SET.contains(group.getGroup())) {
-                    group.setGroup(String.format("%s%s", "%SYS%", 
group.getGroup()));
-                }
-                return group;
-            }).collect(Collectors.toList());
+            Thread.currentThread().interrupt();
+            logger.error("Interruption occurred while waiting for task 
completion", e);
         }
+        logger.info("All consumer group query tasks have been completed");
+        isCacheBeingBuilt = false;
         Collections.sort(groupConsumeInfoList);
-        return groupConsumeInfoList;
+
+        cacheConsumeInfoList.clear();
+        cacheConsumeInfoList.addAll(groupConsumeInfoList);
     }
 
     @Override
@@ -184,16 +244,14 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
             ConsumeStats consumeStats = null;
             try {
                 consumeStats = mqAdminExt.examineConsumeStats(consumerGroup);
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 logger.warn("examineConsumeStats exception to consumerGroup 
{}, response [{}]", consumerGroup, e.getMessage());
             }
-
+            if (consumeStats != null) {
+                groupConsumeInfo.setConsumeTps((int) 
consumeStats.getConsumeTps());
+                groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
+            }
             ConsumerConnection consumerConnection = null;
-            boolean isFifoType = examineSubscriptionGroupConfig(consumerGroup)
-                    
.stream().map(ConsumerConfigInfo::getSubscriptionGroupConfig)
-                    
.allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly);
-
             try {
                 if (StringUtils.isNotEmpty(address)) {
                     consumerConnection = 
proxyAdmin.examineConsumerConnectionInfo(address, consumerGroup);
@@ -203,31 +261,15 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
             } catch (Exception e) {
                 logger.warn("examineConsumeStats exception to consumerGroup 
{}, response [{}]", consumerGroup, e.getMessage());
             }
-
-            groupConsumeInfo.setGroup(consumerGroup);
-            if (SYSTEM_GROUP_SET.contains(consumerGroup)) {
-                groupConsumeInfo.setSubGroupType("SYSTEM");
-            } else if (isFifoType) {
-                groupConsumeInfo.setSubGroupType("FIFO");
-            } else {
-                groupConsumeInfo.setSubGroupType("NORMAL");
-            }
-
-            if (consumeStats != null) {
-                
groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps());
-                groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
-            }
-
             if (consumerConnection != null) {
                 
groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size());
                 
groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel());
                 
groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType());
                 
groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion()));
             }
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             logger.warn("examineConsumeStats or examineConsumerConnectionInfo 
exception, "
-                + consumerGroup, e);
+                    + consumerGroup, e);
         }
         return groupConsumeInfo;
     }
@@ -252,8 +294,7 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
         ConsumeStats consumeStats = null;
         try {
             consumeStats = mqAdminExt.examineConsumeStats(groupName, topic);
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             Throwables.throwIfUnchecked(e);
             throw new RuntimeException(e);
         }
@@ -295,8 +336,7 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
                     results.put(messageQueue, clinetId);
                 }
             }
-        }
-        catch (Exception err) {
+        } catch (Exception err) {
             logger.error("op=getClientConnection_error", err);
         }
         return results;
@@ -311,14 +351,12 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
                 List<TopicConsumerInfo> topicConsumerInfoList = null;
                 try {
                     topicConsumerInfoList = queryConsumeStatsList(topic, 
group);
-                }
-                catch (Exception ignore) {
+                } catch (Exception ignore) {
                 }
                 group2ConsumerInfoMap.put(group, 
CollectionUtils.isEmpty(topicConsumerInfoList) ? new TopicConsumerInfo(topic) : 
topicConsumerInfoList.get(0));
             }
             return group2ConsumerInfoMap;
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             Throwables.throwIfUnchecked(e);
             throw new RuntimeException(e);
         }
@@ -330,7 +368,7 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
         for (String consumerGroup : resetOffsetRequest.getConsumerGroupList()) 
{
             try {
                 Map<MessageQueue, Long> rollbackStatsMap =
-                    
mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), consumerGroup, 
resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce());
+                        
mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), consumerGroup, 
resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce());
                 ConsumerGroupRollBackStat consumerGroupRollBackStat = new 
ConsumerGroupRollBackStat(true);
                 List<RollbackStats> rollbackStatsList = 
consumerGroupRollBackStat.getRollbackStatsList();
                 for (Map.Entry<MessageQueue, Long> rollbackStatsEntty : 
rollbackStatsMap.entrySet()) {
@@ -341,8 +379,7 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
                     rollbackStatsList.add(rollbackStats);
                 }
                 groupRollbackStats.put(consumerGroup, 
consumerGroupRollBackStat);
-            }
-            catch (MQClientException e) {
+            } catch (MQClientException e) {
                 if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
                     try {
                         ConsumerGroupRollBackStat consumerGroupRollBackStat = 
new ConsumerGroupRollBackStat(true);
@@ -350,17 +387,14 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
                         
consumerGroupRollBackStat.setRollbackStatsList(rollbackStatsList);
                         groupRollbackStats.put(consumerGroup, 
consumerGroupRollBackStat);
                         continue;
-                    }
-                    catch (Exception err) {
+                    } catch (Exception err) {
                         logger.error("op=resetOffset_which_not_online_error", 
err);
                     }
-                }
-                else {
+                } else {
                     logger.error("op=resetOffset_error", e);
                 }
                 groupRollbackStats.put(consumerGroup, new 
ConsumerGroupRollBackStat(false, e.getMessage()));
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 logger.error("op=resetOffset_error", e);
                 groupRollbackStats.put(consumerGroup, new 
ConsumerGroupRollBackStat(false, e.getMessage()));
             }
@@ -372,17 +406,21 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
     public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String 
group) {
         List<ConsumerConfigInfo> consumerConfigInfoList = Lists.newArrayList();
         try {
-            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            ClusterInfo clusterInfo = clusterInfoService.get();
             for (String brokerName : 
clusterInfo.getBrokerAddrTable().keySet()) { //foreach brokerName
                 String brokerAddress = 
clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr();
-                SubscriptionGroupConfig subscriptionGroupConfig = 
mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group);
+                SubscriptionGroupConfig subscriptionGroupConfig = null;
+                try {
+                    subscriptionGroupConfig = 
mqAdminExt.examineSubscriptionGroupConfig(brokerAddress, group);
+                } catch (Exception e) {
+                    logger.warn("op=examineSubscriptionGroupConfig_error 
brokerName={} group={}", brokerName, group);
+                }
                 if (subscriptionGroupConfig == null) {
                     continue;
                 }
                 consumerConfigInfoList.add(new 
ConsumerConfigInfo(Lists.newArrayList(brokerName), subscriptionGroupConfig));
             }
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             Throwables.throwIfUnchecked(e);
             throw new RuntimeException(e);
         }
@@ -399,7 +437,7 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
             deleteInNsFlag = true;
         }
         try {
-            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            ClusterInfo clusterInfo = clusterInfoService.get();
             for (String brokerName : 
deleteSubGroupRequest.getBrokerNameList()) {
                 logger.info("addr={} groupName={}", 
clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), 
deleteSubGroupRequest.getGroupName());
                 
mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
 deleteSubGroupRequest.getGroupName(), true);
@@ -407,8 +445,7 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
                 deleteResources(MixAll.RETRY_GROUP_TOPIC_PREFIX + 
deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag);
                 deleteResources(MixAll.DLQ_GROUP_TOPIC_PREFIX + 
deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo, deleteInNsFlag);
             }
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             Throwables.throwIfUnchecked(e);
             throw new RuntimeException(e);
         }
@@ -430,13 +467,12 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
     @Override
     public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo 
consumerConfigInfo) {
         try {
-            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+            ClusterInfo clusterInfo = clusterInfoService.get();
             for (String brokerName : 
changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
-                consumerConfigInfo.getClusterNameList(), 
consumerConfigInfo.getBrokerNameList())) {
+                    consumerConfigInfo.getClusterNameList(), 
consumerConfigInfo.getBrokerNameList())) {
                 
mqAdminExt.createAndUpdateSubscriptionGroupConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
 consumerConfigInfo.getSubscriptionGroupConfig());
             }
-        }
-        catch (Exception err) {
+        } catch (Exception err) {
             Throwables.throwIfUnchecked(err);
             throw new RuntimeException(err);
         }
@@ -451,8 +487,7 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
             for (ConsumerConfigInfo consumerConfigInfo : 
consumerConfigInfoList) {
                 brokerNameSet.addAll(consumerConfigInfo.getBrokerNameList());
             }
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             Throwables.throwIfUnchecked(e);
             throw new RuntimeException(e);
         }
@@ -476,10 +511,29 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
     public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, 
String clientId, boolean jstack) {
         try {
             return mqAdminExt.getConsumerRunningInfo(consumerGroup, clientId, 
jstack);
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             Throwables.throwIfUnchecked(e);
             throw new RuntimeException(e);
         }
     }
+
+    @Override
+    public GroupConsumeInfo refreshGroup(String address, String consumerGroup) 
{
+
+        if (isCacheBeingBuilt || cacheConsumeInfoList.isEmpty()) {
+            throw new RuntimeException("Cache is being built or empty, please 
try again later");
+        }
+        synchronized (cacheConsumeInfoList) {
+            for (int i = 0; i < cacheConsumeInfoList.size(); i++) {
+                GroupConsumeInfo groupConsumeInfo = 
cacheConsumeInfoList.get(i);
+                if (groupConsumeInfo.getGroup().equals(consumerGroup)) {
+                    GroupConsumeInfo updatedInfo = queryGroup(consumerGroup, 
"");
+                    updatedInfo.setUpdateTime(new Date());
+                    cacheConsumeInfoList.set(i, updatedInfo);
+                    return updatedInfo;
+                }
+            }
+        }
+        throw new RuntimeException("No consumer group information available");
+    }
 }
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
index 4f34fc6..14c00a2 100644
--- 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
@@ -43,28 +43,35 @@ import 
org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
 import org.apache.rocketmq.dashboard.model.request.TopicTypeList;
 import org.apache.rocketmq.dashboard.model.request.TopicTypeMeta;
 import org.apache.rocketmq.dashboard.service.AbstractCommonService;
+import org.apache.rocketmq.dashboard.service.ClusterInfoService;
 import org.apache.rocketmq.dashboard.service.TopicService;
+import org.apache.rocketmq.dashboard.service.client.MQAdminExtImpl;
+import org.apache.rocketmq.dashboard.support.GlobalExceptionHandler;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
 import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
 import org.apache.rocketmq.remoting.protocol.body.GroupList;
+import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.body.TopicList;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
 import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
 import org.apache.rocketmq.tools.command.CommandUtil;
 import org.joor.Reflect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
-import org.springframework.util.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -73,9 +80,19 @@ import static 
org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTR
 @Service
 public class TopicServiceImpl extends AbstractCommonService implements 
TopicService {
 
+    private Logger logger = 
LoggerFactory.getLogger(GlobalExceptionHandler.class);
+
+    @Autowired
+    private ClusterInfoService clusterInfoService;
+
+    private final ConcurrentMap<String, TopicRouteData> routeCache = new 
ConcurrentHashMap<>();
+    private final Object cacheLock = new Object();
+
     @Autowired
     private RMQConfigure configure;
 
+    private final ConcurrentMap<String, TopicConfig> topicConfigCache = new 
ConcurrentHashMap<>();
+
     @Override
     public TopicList fetchAllTopicList(boolean skipSysProcess, boolean 
skipRetryAndDlq) {
         try {
@@ -105,37 +122,63 @@ public class TopicServiceImpl extends 
AbstractCommonService implements TopicServ
 
     @Override
     public TopicTypeList examineAllTopicType() {
-        ArrayList<TopicTypeMeta> topicTypes = new ArrayList<>();
-        ArrayList<String> names = new ArrayList<>();
-        ArrayList<String> messageTypes = new ArrayList<>();
-        TopicList topicList = fetchAllTopicList(false, false);
-        checkTopicType(topicList, topicTypes);
-        topicTypes.sort((t1, t2) -> 
t1.getTopicName().compareTo(t2.getTopicName()));
-        for (TopicTypeMeta topicTypeMeta : topicTypes) {
-            names.add(topicTypeMeta.getTopicName());
-            messageTypes.add(topicTypeMeta.getMessageType());
-        }
+        List<String> messageTypes = new ArrayList<>();
+        List<String> names = new ArrayList<>();
+        ClusterInfo clusterInfo = clusterInfoService.get();
+        TopicList sysTopics = getSystemTopicList();
+        clusterInfo.getBrokerAddrTable().values().forEach(brokerAddr -> {
+            try {
+                TopicConfigSerializeWrapper topicConfigSerializeWrapper = 
mqAdminExt.getAllTopicConfig(brokerAddr.getBrokerAddrs().get(0L), 10000L);
+                for (TopicConfig topicConfig : 
topicConfigSerializeWrapper.getTopicConfigTable().values()) {
+                    TopicTypeMeta topicType = 
classifyTopicType(topicConfig.getTopicName(), 
topicConfigSerializeWrapper.getTopicConfigTable().get(topicConfig.getTopicName()).getAttributes(),sysTopics.getTopicList());
+                    if (names.contains(topicType.getTopicName())) {
+                        continue;
+                    }
+                    names.add(topicType.getTopicName());
+                    messageTypes.add(topicType.getMessageType());
+                }
+            } catch (Exception e) {
+                logger.warn("Failed to classify topic type for broker: " + 
brokerAddr, e);
+            }
+        });
+        sysTopics.getTopicList().forEach(topicName -> {
+            String sysTopicName = String.format("%s%s", "%SYS%", topicName);
+            if (!names.contains(sysTopicName)) {
+                names.add(sysTopicName);
+                messageTypes.add("SYSTEM");
+            }
+        });
+
         return new TopicTypeList(names, messageTypes);
     }
 
-    private void checkTopicType(TopicList topicList, ArrayList<TopicTypeMeta> 
topicTypes) {
-        for (String topicName : topicList.getTopicList()) {
-            TopicTypeMeta topicType = new TopicTypeMeta();
-            topicType.setTopicName(topicName);
-            if (topicName.startsWith("%R")) {
-                topicType.setMessageType("RETRY");
-            } else if (topicName.startsWith("%D")) {
-                topicType.setMessageType("DELAY");
-            } else if (topicName.startsWith("%S")) {
-                topicType.setMessageType("SYSTEM");
-            } else {
-                List<TopicConfigInfo> topicConfigInfos = 
examineTopicConfig(topicName);
-                if (!CollectionUtils.isEmpty(topicConfigInfos)) {
-                    
topicType.setMessageType(topicConfigInfos.get(0).getMessageType());
-                }
-            }
-            topicTypes.add(topicType);
+    private TopicTypeMeta classifyTopicType(String topicName, 
Map<String,String> attributes, Set<String> sysTopics) {
+        TopicTypeMeta topicType = new TopicTypeMeta();
+        topicType.setTopicName(topicName);
+
+        if (topicName.startsWith("%R")) {
+            topicType.setMessageType("RETRY");
+            return topicType;
+        } else if (topicName.startsWith("%D")) {
+            topicType.setMessageType("DLQ");
+            return topicType;
+        } else if (sysTopics.contains(topicName) || 
topicName.startsWith("rmq_sys") || 
topicName.equals("DefaultHeartBeatSyncerTopic")) {
+            topicType.setMessageType("SYSTEM");
+            topicType.setTopicName(String.format("%s%s", "%SYS%", topicName));
+            return topicType;
+        }
+        if (attributes == null || attributes.isEmpty()) {
+            topicType.setMessageType("UNSPECIFIED");
+            return topicType;
+        }
+
+        String messageType = 
attributes.get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName());
+        if (StringUtils.isBlank(messageType)) {
+            messageType = TopicMessageType.UNSPECIFIED.name();
         }
+        topicType.setMessageType(messageType);
+
+        return topicType;
     }
 
     @Override
@@ -150,11 +193,24 @@ public class TopicServiceImpl extends 
AbstractCommonService implements TopicServ
 
     @Override
     public TopicRouteData route(String topic) {
-        try {
-            return mqAdminExt.examineTopicRouteInfo(topic);
-        } catch (Exception ex) {
-            Throwables.throwIfUnchecked(ex);
-            throw new RuntimeException(ex);
+        TopicRouteData cachedData = routeCache.get(topic);
+        if (cachedData != null) {
+            return cachedData;
+        }
+
+        synchronized (cacheLock) {
+            cachedData = routeCache.get(topic);
+            if (cachedData != null) {
+                return cachedData;
+            }
+            try {
+                TopicRouteData freshData = 
mqAdminExt.examineTopicRouteInfo(topic);
+                routeCache.put(topic, freshData);
+                return freshData;
+            } catch (Exception ex) {
+                Throwables.throwIfUnchecked(ex);
+                throw new RuntimeException(ex);
+            }
         }
     }
 
@@ -170,6 +226,7 @@ public class TopicServiceImpl extends AbstractCommonService 
implements TopicServ
 
     @Override
     public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
+        MQAdminExtImpl.clearTopicConfigCache();
         TopicConfig topicConfig = new TopicConfig();
         BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
         String messageType = topicCreateOrUpdateRequest.getMessageType();
@@ -189,12 +246,15 @@ public class TopicServiceImpl extends 
AbstractCommonService implements TopicServ
         }
     }
 
-    @Override
     public TopicConfig examineTopicConfig(String topic, String brokerName) {
-        ClusterInfo clusterInfo = null;
         try {
-            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
-            return 
mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
 topic);
+            ClusterInfo clusterInfo = clusterInfoService.get();
+
+            BrokerData brokerData = 
clusterInfo.getBrokerAddrTable().get(brokerName);
+            if (brokerData == null) {
+                throw new RuntimeException("Broker not found: " + brokerName);
+            }
+            return 
mqAdminExt.examineTopicConfig(brokerData.selectBrokerAddr(), topic);
         } catch (Exception e) {
             Throwables.throwIfUnchecked(e);
             throw new RuntimeException(e);
@@ -371,6 +431,14 @@ public class TopicServiceImpl extends 
AbstractCommonService implements TopicServ
 
     }
 
+    @Override
+    public boolean refreshTopicList() {
+        routeCache.clear();
+        clusterInfoService.refresh();
+        MQAdminExtImpl.clearTopicConfigCache();
+        return true;
+    }
+
     private void waitSendTraceFinish(DefaultMQProducer producer, boolean 
traceEnabled) {
         if (!traceEnabled) {
             return;
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java 
b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java
index d58668b..c9a870c 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/task/DashboardCollectTask.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import javax.annotation.Resource;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.dashboard.service.ConsumerService;
 import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
 import org.apache.rocketmq.remoting.protocol.body.KVTable;
 import org.apache.rocketmq.remoting.protocol.body.TopicList;
@@ -59,6 +60,9 @@ public class DashboardCollectTask {
     @Resource
     private DashboardCollectService dashboardCollectService;
 
+    @Resource
+    private ConsumerService consumerService;
+
     private final static Logger log = 
LoggerFactory.getLogger(DashboardCollectTask.class);
 
     @Resource
@@ -89,6 +93,13 @@ public class DashboardCollectTask {
         }
     }
 
+    @Scheduled(cron = "0 0 2 * * ?")
+    public void collectConsumer() {
+        consumerService.queryGroupList(false, null);
+    }
+
+
+
     @Scheduled(cron = "0 0/1 * * * ?")
     public void collectBroker() {
         if (!rmqConfigure.isEnableDashBoardCollect()) {
diff --git a/src/main/resources/static/src/consumer.js 
b/src/main/resources/static/src/consumer.js
index 8cf845b..8d5f2c7 100644
--- a/src/main/resources/static/src/consumer.js
+++ b/src/main/resources/static/src/consumer.js
@@ -70,6 +70,34 @@ module.controller('consumerController', ['$scope', 
'ngDialog', '$http', 'Notific
         }
         $scope.filterList($scope.paginationConf.currentPage)
     };
+    $scope.refreshConsumerGroup = function (groupName) {
+        //Show loader
+        $('#loaderConsumer').removeClass("hide-myloader");
+
+        $http({
+            method: "GET",
+            url: "/consumer/group.refresh",
+            params: {
+                address: $scope.isRmqVersionV5() ? 
localStorage.getItem('proxyAddr') : null,
+                consumerGroup: groupName
+            }
+        }).success(function (resp) {
+            if (resp.status == 0) {
+                for (var i = 0; i < $scope.allConsumerGrouopList.length; i++) {
+                    if ($scope.allConsumerGrouopList[i].group === groupName) {
+                        $scope.allConsumerGrouopList[i] = resp.data;
+                        break;
+                    }
+                }
+                
$scope.showConsumerGroupList($scope.paginationConf.currentPage, 
$scope.allConsumerGrouopList.length);
+                //Hide loader
+                $('#loaderConsumer').addClass("hide-myloader");
+            } else {
+                Notification.error({message: resp.errMsg, delay: 2000});
+            }
+        });
+    }
+
     $scope.refreshConsumerData = function () {
         //Show loader
         $('#loaderConsumer').removeClass("hide-myloader");
@@ -421,4 +449,4 @@ module.controller('consumerTopicViewDialogController', 
['$scope', 'ngDialog', '$
             });
         };
     }]
-);
\ No newline at end of file
+);
diff --git a/src/main/resources/static/src/i18n/en.js 
b/src/main/resources/static/src/i18n/en.js
index 2c1450d..7fbb042 100644
--- a/src/main/resources/static/src/i18n/en.js
+++ b/src/main/resources/static/src/i18n/en.js
@@ -135,4 +135,5 @@ var en = {
     "MESSAGE_TYPE_FIFO": "FIFO",
     "MESSAGE_TYPE_DELAY": "DELAY",
     "MESSAGE_TYPE_TRANSACTION": "TRANSACTION",
+    "UPDATE_TIME": "Update Time",
 }
diff --git a/src/main/resources/static/src/i18n/zh.js 
b/src/main/resources/static/src/i18n/zh.js
index 2f0e6f3..ec2ebdd 100644
--- a/src/main/resources/static/src/i18n/zh.js
+++ b/src/main/resources/static/src/i18n/zh.js
@@ -136,4 +136,5 @@ var zh = {
     "MESSAGE_TYPE_FIFO": "顺序消息",
     "MESSAGE_TYPE_DELAY": "定时/延时消息",
     "MESSAGE_TYPE_TRANSACTION": "事务消息",
-}
\ No newline at end of file
+    "UPDATE_TIME": "更新时间",
+}
diff --git a/src/main/resources/static/src/message.js 
b/src/main/resources/static/src/message.js
index a496285..c980d9e 100644
--- a/src/main/resources/static/src/message.js
+++ b/src/main/resources/static/src/message.js
@@ -277,4 +277,4 @@ module.controller('messageDetailViewDialogController', 
['$scope', 'ngDialog', '$
             $scope.messageTrackShowList = canShowList;
         });
     }]
-);
\ No newline at end of file
+);
diff --git a/src/main/resources/static/src/topic.js 
b/src/main/resources/static/src/topic.js
index 7ad997c..13c3dbb 100644
--- a/src/main/resources/static/src/topic.js
+++ b/src/main/resources/static/src/topic.js
@@ -59,7 +59,7 @@ module.controller('topicController', ['$scope', 'ngDialog', 
'$http', 'Notificati
     $scope.userRole = $window.sessionStorage.getItem("userrole");
     $scope.writeOperationEnabled = $scope.userRole == null ? true : 
($scope.userRole == 1 ? true : false);
 
-    $scope.refreshTopicList = function () {
+    $scope.getTopicList = function () {
         $http({
             method: "GET",
             url: "topic/list.queryTopicType"
@@ -77,7 +77,34 @@ module.controller('topicController', ['$scope', 'ngDialog', 
'$http', 'Notificati
         });
     };
 
-    $scope.refreshTopicList();
+    $scope.refreshTopicList = function () {
+        $http({
+            method: "POST",
+            url: "topic/refresh"
+        }).success(function (resp) {
+            if (resp.status == 0 && resp.data == true) {
+                $http({
+                    method: "GET",
+                    url: "topic/list.queryTopicType"
+                }).success(function (resp1) {
+                    if (resp1.status == 0) {
+                        $scope.allTopicNameList = resp1.data.topicNameList;
+                        $scope.allMessageTypeList = resp1.data.messageTypeList;
+                        console.log($scope.allTopicNameList);
+                        console.log(JSON.stringify(resp1));
+                        $scope.showTopicList(1, 
$scope.allTopicNameList.length);
+                    } else {
+                        Notification.error({message: resp1.errMsg, delay: 
5000});
+                    }
+                });
+
+            } else {
+                Notification.error({message: resp.errMsg, delay: 5000});
+            }
+        });
+    };
+
+    $scope.getTopicList();
 
     $scope.filterStr = "";
     $scope.$watch('filterStr', function () {
@@ -127,17 +154,17 @@ module.controller('topicController', ['$scope', 
'ngDialog', '$http', 'Notificati
 
     $scope.filterByType = function (str, type) {
         if ($scope.filterRetry) {
-            if (str.startsWith("%R")) {
+            if (type.includes("RETRY")) {
                 return true
             }
         }
         if ($scope.filterDLQ) {
-            if (str.startsWith("%D")) {
+            if (type.includes("DLQ")) {
                 return true
             }
         }
         if ($scope.filterSystem) {
-            if (str.startsWith("%S")) {
+            if (type.includes("SYSTEM")) {
                 return true
             }
         }
@@ -386,10 +413,6 @@ module.controller('topicController', ['$scope', 
'ngDialog', '$http', 'Notificati
             if (resp.status == 0) {
                 console.log(resp);
                 ngDialog.open({
-                    preCloseCallback: function (value) {
-                        // Refresh topic list
-                        $scope.refreshTopicList();
-                    },
                     template: 'topicModifyDialog',
                     controller: 'topicModifyDialogController',
                     data: {
@@ -540,4 +563,4 @@ module.controller('routerViewDialogController', ['$scope', 
'ngDialog', '$http',
             })
         };
     }]
-);
\ No newline at end of file
+);
diff --git a/src/main/resources/static/view/pages/consumer.html 
b/src/main/resources/static/view/pages/consumer.html
index 68c7786..c187f72 100644
--- a/src/main/resources/static/view/pages/consumer.html
+++ b/src/main/resources/static/view/pages/consumer.html
@@ -33,9 +33,6 @@
                 <button class="btn btn-raised btn-sm btn-primary" 
type="button" ng-show="{{writeOperationEnabled}}"
                         ng-click="openAddDialog()">{{'ADD' | translate}}/ 
{{'UPDATE' | translate}}
                 </button>
-                <button class="btn btn-raised btn-sm btn-primary" 
type="button" ng-click="refreshConsumerData()">
-                    {{'REFRESH' | translate}}
-                </button>
                 <md-switch class="md-primary" md-no-ink aria-label="Switch No 
Ink" ng-model="intervalProcessSwitch">
                     {{'AUTO_REFRESH' | translate}}
                 </md-switch>
@@ -53,6 +50,7 @@
                         <th class="text-center">{{ 'MODE' | translate}}</th>
                         <th class="text-center"><a 
ng-click="sortByKey('consumeTps')">TPS</a></th>
                         <th class="text-center"><a 
ng-click="sortByKey('diffTotal')">{{ 'DELAY' | translate}}</a></th>
+                        <th class="text-center">{{ 'UPDATE_TIME' | 
translate}}</th>
                         <th class="text-center">{{ 'OPERATION' | 
translate}}</th>
                     </tr>
                     <tr ng-repeat="consumerGroup in consumerGroupShowList"
@@ -65,6 +63,7 @@
                         <td 
class="text-center">{{consumerGroup.messageModel}}</td>
                         <td 
class="text-center">{{consumerGroup.consumeTps}}</td>
                         <td 
class="text-center">{{consumerGroup.diffTotal}}</td>
+                        <td 
class="text-center">{{consumerGroup.updateTime}}</td>
                         <td class="text-left">
                             <button name="client" 
ng-click="client(consumerGroup.group, consumerGroup.address)"
                                     class="btn btn-raised btn-sm btn-primary"
@@ -85,6 +84,9 @@
                                     ng-show="{{!sysFlag && 
writeOperationEnabled}}"
                                     type="button">{{'DELETE' | translate}}
                             </button>
+                            <button class="btn btn-raised btn-sm btn-primary" 
type="button" ng-click="refreshConsumerGroup(consumerGroup.group)">
+                                {{'REFRESH' | translate}}
+                            </button>
 
                         </td>
                     </tr>
@@ -568,4 +570,4 @@
             </div>
         </div>
     </div>
-</script>
\ No newline at end of file
+</script>

Reply via email to