This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git


The following commit(s) were added to refs/heads/master by this push:
     new d58e13d  Proxy Support And ConsumerGroup Enhancement (#207)
d58e13d is described below

commit d58e13da95b992cb3d6b05de1b1fe0ee80cb1e87
Author: Akai <91858554+1294566...@users.noreply.github.com>
AuthorDate: Wed Jun 12 09:12:19 2024 +0800

    Proxy Support And ConsumerGroup Enhancement (#207)
    
    * Support dashboard v4-v5 switch And query for v5 topic
    
    * Modify tag name
    
    * Support proxy-module And Fix the problem of showing wrong 
consumerGroup-info
    
    ---------
    
    Co-authored-by: yuanziwei <yuanzi...@xiaomi.com>
---
 .../rocketmq/dashboard/config/RMQConfigure.java    | 23 +++++
 .../dashboard/controller/ConsumerController.java   | 16 ++--
 .../dashboard/controller/ProxyController.java      | 54 ++++++++++++
 .../rocketmq/dashboard/model/GroupConsumeInfo.java | 27 ++++--
 .../dashboard/service/ConsumerService.java         |  8 +-
 .../rocketmq/dashboard/service/ProxyService.java   | 28 +++++++
 .../dashboard/service/client/MQAdminExtImpl.java   |  5 +-
 .../dashboard/service/client/ProxyAdmin.java       | 28 +++++++
 .../dashboard/service/client/ProxyAdminImpl.java   | 60 +++++++++++++
 .../service/impl/ConsumerServiceImpl.java          | 67 +++++++++++----
 .../dashboard/service/impl/ProxyServiceImpl.java   | 59 +++++++++++++
 .../rocketmq/dashboard/task/MonitorTask.java       |  2 +-
 src/main/resources/application.yml                 |  3 +
 src/main/resources/static/index.html               |  1 +
 src/main/resources/static/src/app.js               |  3 +
 src/main/resources/static/src/consumer.js          |  9 +-
 src/main/resources/static/src/i18n/en.js           |  1 +
 src/main/resources/static/src/i18n/zh.js           |  1 +
 src/main/resources/static/src/proxy.js             | 97 ++++++++++++++++++++++
 src/main/resources/static/view/layout/_header.html |  1 +
 src/main/resources/static/view/pages/consumer.html |  4 +-
 src/main/resources/static/view/pages/proxy.html    | 67 +++++++++++++++
 22 files changed, 516 insertions(+), 48 deletions(-)

diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java 
b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java
index 991a2d8..5ce21ff 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java
@@ -43,6 +43,8 @@ public class RMQConfigure {
     //use rocketmq.namesrv.addr first,if it is empty,than use system proerty 
or system env
     private volatile String namesrvAddr = 
System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, 
System.getenv(MixAll.NAMESRV_ADDR_ENV));
 
+    private volatile String proxyAddr;
+
     private volatile String isVIPChannel = 
System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true");
 
 
@@ -62,6 +64,8 @@ public class RMQConfigure {
 
     private List<String> namesrvAddrs = new ArrayList<>();
 
+    private List<String> proxyAddrs = new ArrayList<>();
+
     public String getAccessKey() {
         return accessKey;
     }
@@ -86,6 +90,25 @@ public class RMQConfigure {
         return namesrvAddrs;
     }
 
+    public List<String> getProxyAddrs() {
+        return this.proxyAddrs;
+    }
+
+    public void setProxyAddrs(List<String> proxyAddrs) {
+        this.proxyAddrs = proxyAddrs;
+        if (CollectionUtils.isNotEmpty(proxyAddrs)) {
+            this.setProxyAddr(proxyAddrs.get(0));
+        }
+    }
+
+    public String getProxyAddr() {
+        return proxyAddr;
+    }
+
+    public void setProxyAddr(String proxyAddr) {
+        this.proxyAddr = proxyAddr;
+    }
+
     public void setNamesrvAddrs(List<String> namesrvAddrs) {
         this.namesrvAddrs = namesrvAddrs;
         if (CollectionUtils.isNotEmpty(namesrvAddrs)) {
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
 
b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
index d9f22e4..96fc056 100644
--- 
a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
@@ -47,14 +47,14 @@ public class ConsumerController {
 
     @RequestMapping(value = "/groupList.query")
     @ResponseBody
-    public Object list(@RequestParam(value = "skipSysGroup", required = false) 
boolean skipSysGroup) {
-        return consumerService.queryGroupList(skipSysGroup);
+    public Object list(@RequestParam(value = "skipSysGroup", required = false) 
boolean skipSysGroup, String address) {
+        return consumerService.queryGroupList(skipSysGroup, address);
     }
 
     @RequestMapping(value = "/group.query")
     @ResponseBody
-    public Object groupQuery(@RequestParam String consumerGroup) {
-        return consumerService.queryGroup(consumerGroup);
+    public Object groupQuery(@RequestParam String consumerGroup, String 
address) {
+        return consumerService.queryGroup(consumerGroup, address);
     }
 
     @RequestMapping(value = "/resetOffset.do", method = {RequestMethod.POST})
@@ -99,14 +99,14 @@ public class ConsumerController {
 
     @RequestMapping(value = "/queryTopicByConsumer.query")
     @ResponseBody
-    public Object queryConsumerByTopic(@RequestParam String consumerGroup) {
-        return consumerService.queryConsumeStatsListByGroupName(consumerGroup);
+    public Object queryConsumerByTopic(@RequestParam String consumerGroup, 
String address) {
+        return consumerService.queryConsumeStatsListByGroupName(consumerGroup, 
address);
     }
 
     @RequestMapping(value = "/consumerConnection.query")
     @ResponseBody
-    public Object consumerConnection(@RequestParam(required = false) String 
consumerGroup) {
-        ConsumerConnection consumerConnection = 
consumerService.getConsumerConnection(consumerGroup);
+    public Object consumerConnection(@RequestParam(required = false) String 
consumerGroup, String address) {
+        ConsumerConnection consumerConnection = 
consumerService.getConsumerConnection(consumerGroup, address);
         
consumerConnection.setConnectionSet(ConnectionInfo.buildConnectionInfoHashSet(consumerConnection.getConnectionSet()));
         return consumerConnection;
     }
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java 
b/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java
new file mode 100644
index 0000000..27aa59d
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.controller;
+
+import org.apache.rocketmq.dashboard.permisssion.Permission;
+import org.apache.rocketmq.dashboard.service.ProxyService;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+import javax.annotation.Resource;
+
+@Controller
+@RequestMapping("/proxy")
+@Permission
+public class ProxyController {
+    @Resource
+    private ProxyService proxyService;
+    @RequestMapping(value = "/homePage.query", method = RequestMethod.GET)
+    @ResponseBody
+    public Object homePage() {
+        return proxyService.getProxyHomePage();
+    }
+
+    @RequestMapping(value = "/addProxyAddr.do", method = RequestMethod.POST)
+    @ResponseBody
+    public Object addProxyAddr(@RequestParam String newProxyAddr) {
+        proxyService.addProxyAddrList(newProxyAddr);
+        return true;
+    }
+
+    @RequestMapping(value = "/updateProxyAddr.do", method = RequestMethod.POST)
+    @ResponseBody
+    public Object updateProxyAddr(@RequestParam String proxyAddr) {
+        proxyService.updateProxyAddrList(proxyAddr);
+        return true;
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java 
b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
index 0d19af9..db11c41 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
@@ -19,12 +19,15 @@ package org.apache.rocketmq.dashboard.model;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
 
+import java.util.List;
+
 public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
     private String group;
     private String version;
     private int count;
     private ConsumeType consumeType;
     private MessageModel messageModel;
+    private List<String> address;
     private int consumeTps;
     private long diffTotal = -1;
     private String subGroupType = "NORMAL";
@@ -70,6 +73,22 @@ public class GroupConsumeInfo implements 
Comparable<GroupConsumeInfo> {
         this.diffTotal = diffTotal;
     }
 
+    public List<String> getAddress() {
+        return address;
+    }
+
+    public void setAddress(List<String> address) {
+        this.address = address;
+    }
+
+    public String getSubGroupType() {
+        return subGroupType;
+    }
+
+    public void setSubGroupType(String subGroupType) {
+        this.subGroupType = subGroupType;
+    }
+
     @Override
     public int compareTo(GroupConsumeInfo o) {
         if (this.count != o.count) {
@@ -93,12 +112,4 @@ public class GroupConsumeInfo implements 
Comparable<GroupConsumeInfo> {
     public void setVersion(String version) {
         this.version = version;
     }
-
-    public String getSubGroupType() {
-        return subGroupType;
-    }
-
-    public void setSubGroupType(String subGroupType) {
-        this.subGroupType = subGroupType;
-    }
 }
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java 
b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
index c475931..e284c44 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
@@ -31,12 +31,12 @@ import java.util.Map;
 import java.util.Set;
 
 public interface ConsumerService {
-    List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup);
+    List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup,String address);
 
-    GroupConsumeInfo queryGroup(String consumerGroup);
+    GroupConsumeInfo queryGroup(String consumerGroup, String address);
 
 
-    List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName);
+    List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, 
String address);
 
     List<TopicConsumerInfo> queryConsumeStatsList(String topic, String 
groupName);
 
@@ -52,7 +52,7 @@ public interface ConsumerService {
 
     Set<String> fetchBrokerNameSetBySubscriptionGroup(String group);
 
-    ConsumerConnection getConsumerConnection(String consumerGroup);
+    ConsumerConnection getConsumerConnection(String consumerGroup, String 
address);
 
     ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String 
clientId, boolean jstack);
 }
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java 
b/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java
new file mode 100644
index 0000000..2a64680
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.service;
+
+import java.util.Map;
+
+public interface ProxyService {
+
+    void addProxyAddrList(String proxyAddr);
+
+    void updateProxyAddrList(String proxyAddr);
+
+    Map<String, Object>  getProxyHomePage();
+}
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
 
b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
index 360c0e3..0281c5c 100644
--- 
a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
@@ -627,7 +627,7 @@ public class MQAdminExtImpl implements MQAdminExt {
             long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException,
             RemotingConnectException, MQBrokerException {
         // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Unimplemented method 
'examineConsumeStats'");
+        return 
MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(brokerAddr, 
consumerGroup, topicName, timeoutMillis);
     }
 
     @Override
@@ -639,8 +639,7 @@ public class MQAdminExtImpl implements MQAdminExt {
     @Override
     public ConsumerConnection examineConsumerConnectionInfo(String 
consumerGroup, String brokerAddr)
             throws InterruptedException, MQBrokerException, RemotingException, 
MQClientException {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Unimplemented method 
'examineConsumerConnectionInfo'");
+        return 
MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup,
 brokerAddr);
     }
 
     @Override
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java 
b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java
new file mode 100644
index 0000000..4344c7c
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.service.client;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
+
+public interface ProxyAdmin {
+
+    ConsumerConnection examineConsumerConnectionInfo(String addr, String 
consumerGroup) throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQBrokerException;
+}
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java
 
b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java
new file mode 100644
index 0000000..eadae12
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.service.client;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
+import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import static 
org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONSUMER_CONNECTION_LIST;
+
+@Slf4j
+@Service
+public class ProxyAdminImpl implements ProxyAdmin {
+    @Autowired
+    private GenericObjectPool<MQAdminExt> mqAdminExtPool;
+
+    @Override
+    public ConsumerConnection examineConsumerConnectionInfo(String addr, 
String consumerGroup) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, InterruptedException, 
MQBrokerException {
+        try {
+            MQAdminInstance.createMQAdmin(mqAdminExtPool);
+            RemotingClient remotingClient = 
MQAdminInstance.threadLocalRemotingClient();
+            GetConsumerConnectionListRequestHeader requestHeader = new 
GetConsumerConnectionListRequestHeader();
+            requestHeader.setConsumerGroup(consumerGroup);
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(GET_CONSUMER_CONNECTION_LIST, 
requestHeader);
+            RemotingCommand response = remotingClient.invokeSync(addr, 
request, 3000);
+            switch (response.getCode()) {
+                case 0:
+                    return ConsumerConnection.decode(response.getBody(), 
ConsumerConnection.class);
+                default:
+                    throw new MQBrokerException(response.getCode(), 
response.getRemark(), addr);
+            }
+        } finally {
+            MQAdminInstance.returnMQAdmin(mqAdminExtPool);
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
index a1cf9ff..9bc37ab 100644
--- 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
@@ -23,8 +23,10 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +46,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
 import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
 import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -77,6 +80,8 @@ import org.springframework.stereotype.Service;
 public class ConsumerServiceImpl extends AbstractCommonService implements 
ConsumerService, InitializingBean, DisposableBean {
     private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
 
+    @Resource
+    protected ProxyAdmin proxyAdmin;
     @Resource
     private RMQConfigure configure;
 
@@ -119,25 +124,33 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
     }
 
     @Override
-    public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup) {
-        Set<String> consumerGroupSet = Sets.newHashSet();
+    public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup, String 
address) {
+        HashMap<String, List<String>> consumerGroupMap = Maps.newHashMap();
         try {
             ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
             for (BrokerData brokerData : 
clusterInfo.getBrokerAddrTable().values()) {
                 SubscriptionGroupWrapper subscriptionGroupWrapper = 
mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
-                
consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
+                for (String groupName : 
subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) {
+                    if (!consumerGroupMap.containsKey(groupName)) {
+                        consumerGroupMap.putIfAbsent(groupName, new 
ArrayList<>());
+                    }
+                    List<String> addresses = consumerGroupMap.get(groupName);
+                    addresses.add(brokerData.selectBrokerAddr());
+                    consumerGroupMap.put(groupName, addresses);
+                }
             }
-        }
-        catch (Exception err) {
+        } catch (Exception err) {
             Throwables.throwIfUnchecked(err);
             throw new RuntimeException(err);
         }
         List<GroupConsumeInfo> groupConsumeInfoList = 
Collections.synchronizedList(Lists.newArrayList());
-        CountDownLatch countDownLatch = new 
CountDownLatch(consumerGroupSet.size());
-        for (String consumerGroup : consumerGroupSet) {
+        CountDownLatch countDownLatch = new 
CountDownLatch(consumerGroupMap.size());
+        for (Map.Entry<String, List<String>> entry : 
consumerGroupMap.entrySet()) {
+            String consumerGroup = entry.getKey();
             executorService.submit(() -> {
                 try {
-                    GroupConsumeInfo consumeInfo = queryGroup(consumerGroup);
+                    GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, 
address);
+                    consumeInfo.setAddress(entry.getValue());
                     groupConsumeInfoList.add(consumeInfo);
                 } catch (Exception e) {
                     logger.error("queryGroup exception, consumerGroup: {}", 
consumerGroup, e);
@@ -165,7 +178,7 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
     }
 
     @Override
-    public GroupConsumeInfo queryGroup(String consumerGroup) {
+    public GroupConsumeInfo queryGroup(String consumerGroup, String address) {
         GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
         try {
             ConsumeStats consumeStats = null;
@@ -182,9 +195,12 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
                     
.allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly);
 
             try {
-                consumerConnection = 
mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
-            }
-            catch (Exception e) {
+                if (StringUtils.isNotEmpty(address)) {
+                    consumerConnection = 
proxyAdmin.examineConsumerConnectionInfo(address, consumerGroup);
+                } else {
+                    consumerConnection = 
mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
+                }
+            } catch (Exception e) {
                 logger.warn("examineConsumeStats exception to consumerGroup 
{}, response [{}]", consumerGroup, e.getMessage());
             }
 
@@ -217,8 +233,18 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
     }
 
     @Override
-    public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String 
groupName) {
-        return queryConsumeStatsList(null, groupName);
+    public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String 
groupName, String address) {
+        ConsumeStats consumeStats;
+        String topic = null;
+        try {
+            String[] addresses = address.split(",");
+            String addr = addresses[0];
+            consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, 
null, 3000);
+        } catch (Exception e) {
+            Throwables.throwIfUnchecked(e);
+            throw new RuntimeException(e);
+        }
+        return toTopicConsumerInfoList(topic, consumeStats, groupName);
     }
 
     @Override
@@ -231,6 +257,10 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
             Throwables.throwIfUnchecked(e);
             throw new RuntimeException(e);
         }
+        return toTopicConsumerInfoList(topic, consumeStats, groupName);
+    }
+
+    private List<TopicConsumerInfo> toTopicConsumerInfoList(String topic, 
ConsumeStats consumeStats, String groupName) {
         List<MessageQueue> mqList = 
Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new 
Predicate<MessageQueue>() {
             @Override
             public boolean apply(MessageQueue o) {
@@ -431,11 +461,12 @@ public class ConsumerServiceImpl extends 
AbstractCommonService implements Consum
     }
 
     @Override
-    public ConsumerConnection getConsumerConnection(String consumerGroup) {
+    public ConsumerConnection getConsumerConnection(String consumerGroup, 
String address) {
         try {
-            return mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
-        }
-        catch (Exception e) {
+            String[] addresses = address.split(",");
+            String addr = addresses[0];
+            return mqAdminExt.examineConsumerConnectionInfo(consumerGroup, 
addr);
+        } catch (Exception e) {
             Throwables.throwIfUnchecked(e);
             throw new RuntimeException(e);
         }
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java
 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java
new file mode 100644
index 0000000..07e63b3
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.service.impl;
+
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.dashboard.config.RMQConfigure;
+import org.apache.rocketmq.dashboard.service.ProxyService;
+import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+@Service
+public class ProxyServiceImpl implements ProxyService {
+    @Resource
+    protected ProxyAdmin proxyAdmin;
+    @Resource
+    private RMQConfigure configure;
+
+    @Override
+    public void addProxyAddrList(String proxyAddr) {
+        List<String> proxyAddrs = configure.getProxyAddrs();
+        if (proxyAddrs != null && !proxyAddrs.contains(proxyAddr)) {
+            proxyAddrs.add(proxyAddr);
+        }
+        configure.setProxyAddrs(proxyAddrs);
+    }
+
+    @Override
+    public void updateProxyAddrList(String proxyAddr) {
+        configure.setProxyAddr(proxyAddr);
+    }
+
+    @Override
+    public Map<String, Object> getProxyHomePage() {
+        Map<String, Object> homePageInfoMap = Maps.newHashMap();
+        homePageInfoMap.put("currentProxyAddr", configure.getProxyAddr());
+        homePageInfoMap.put("proxyAddrList", configure.getProxyAddrs());
+        return homePageInfoMap;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java 
b/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java
index 710929b..3c8a77e 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java
@@ -40,7 +40,7 @@ public class MonitorTask {
 //    @Scheduled(cron = "* * * * * ?")
     public void scanProblemConsumeGroup() {
         for (Map.Entry<String, ConsumerMonitorConfig> configEntry : 
monitorService.queryConsumerMonitorConfig().entrySet()) {
-            GroupConsumeInfo consumeInfo = 
consumerService.queryGroup(configEntry.getKey());
+            GroupConsumeInfo consumeInfo = 
consumerService.queryGroup(configEntry.getKey(), null);
             if (consumeInfo.getCount() < configEntry.getValue().getMinCount() 
|| consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) {
                 logger.info("op=look consumeInfo {}", 
JsonUtil.obj2String(consumeInfo)); // notify the alert system
             }
diff --git a/src/main/resources/application.yml 
b/src/main/resources/application.yml
index 090e421..fe4d283 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -59,6 +59,9 @@ rocketmq:
     # must create userInfo file: ${rocketmq.config.dataPath}/users.properties 
if the login is required
     loginRequired: false
     useTLS: false
+    proxyAddr: 127.0.0.1:8080
+    proxyAddrs:
+      - 127.0.0.1:8080
     # set the accessKey and secretKey if you used acl
 #    accessKey: rocketmq2
 #    secretKey: 12345678
diff --git a/src/main/resources/static/index.html 
b/src/main/resources/static/index.html
index c2bf349..ee3c3fe 100644
--- a/src/main/resources/static/index.html
+++ b/src/main/resources/static/index.html
@@ -104,6 +104,7 @@
 <script type="text/javascript" 
src="src/tools/tools.js?v=201703171710"></script>
 <script type="text/javascript" src="src/cluster.js?timestamp=4"></script>
 <script type="text/javascript" src="src/topic.js"></script>
+<script type="text/javascript" src="src/proxy.js"></script>
 <script type="text/javascript" src="src/consumer.js?timestamp=6"></script>
 <script type="text/javascript" src="src/producer.js"></script>
 <script type="text/javascript" src="src/message.js"></script>
diff --git a/src/main/resources/static/src/app.js 
b/src/main/resources/static/src/app.js
index a7ca1be..1bbb650 100644
--- a/src/main/resources/static/src/app.js
+++ b/src/main/resources/static/src/app.js
@@ -213,6 +213,9 @@ app.config(['$routeProvider', 
'$httpProvider','$cookiesProvider','getDictNamePro
         }).when('/ops', {
             templateUrl: 'view/pages/ops.html',
             controller:'opsController'
+        }).when('/proxy', {
+            templateUrl: 'view/pages/proxy.html',
+            controller:'proxyController'
         }).when('/acl', {
             templateUrl: 'view/pages/acl.html',
             controller: 'aclController'
diff --git a/src/main/resources/static/src/consumer.js 
b/src/main/resources/static/src/consumer.js
index 8c0833e..d192334 100644
--- a/src/main/resources/static/src/consumer.js
+++ b/src/main/resources/static/src/consumer.js
@@ -79,6 +79,7 @@ module.controller('consumerController', ['$scope', 
'ngDialog', '$http', 'Notific
             url: "consumer/groupList.query",
             params: {
                 skipSysGroup: false,
+                address: localStorage.getItem('isV5') ? 
localStorage.getItem('proxyAddr') : null
             }
         }).success(function (resp) {
             if (resp.status == 0) {
@@ -243,11 +244,11 @@ module.controller('consumerController', ['$scope', 
'ngDialog', '$http', 'Notific
             }
         });
     };
-    $scope.detail = function (consumerGroupName) {
+    $scope.detail = function (consumerGroupName, address) {
         $http({
             method: "GET",
             url: "consumer/queryTopicByConsumer.query",
-            params: {consumerGroup: consumerGroupName}
+            params: {consumerGroup: consumerGroupName, address: address}
         }).success(function (resp) {
             if (resp.status == 0) {
                 console.log(resp);
@@ -262,11 +263,11 @@ module.controller('consumerController', ['$scope', 
'ngDialog', '$http', 'Notific
         });
     };
 
-    $scope.client = function (consumerGroupName) {
+    $scope.client = function (consumerGroupName, address) {
         $http({
             method: "GET",
             url: "consumer/consumerConnection.query",
-            params: {consumerGroup: consumerGroupName}
+            params: {consumerGroup: consumerGroupName, address: address}
         }).success(function (resp) {
             if (resp.status == 0) {
                 console.log(resp);
diff --git a/src/main/resources/static/src/i18n/en.js 
b/src/main/resources/static/src/i18n/en.js
index 6bc16cd..83083d7 100644
--- a/src/main/resources/static/src/i18n/en.js
+++ b/src/main/resources/static/src/i18n/en.js
@@ -100,6 +100,7 @@ var en = {
     "RESET_OFFSET":"resetOffset",
     "CLUSTER_NAME":"clusterName",
     "OPS":"OPS",
+    "PROXY":"Proxy",
     "AUTO_REFRESH":"AUTO_REFRESH",
     "REFRESH":"REFRESH",
     "LOGOUT":"Logout",
diff --git a/src/main/resources/static/src/i18n/zh.js 
b/src/main/resources/static/src/i18n/zh.js
index f71ae34..f8c3c1d 100644
--- a/src/main/resources/static/src/i18n/zh.js
+++ b/src/main/resources/static/src/i18n/zh.js
@@ -101,6 +101,7 @@ var zh = {
     "RESET_OFFSET":"重置位点",
     "CLUSTER_NAME":"集群名",
     "OPS":"运维",
+    "PROXY":"代理",
     "AUTO_REFRESH":"自动刷新",
     "REFRESH":"刷新",
     "LOGOUT":"退出",
diff --git a/src/main/resources/static/src/proxy.js 
b/src/main/resources/static/src/proxy.js
new file mode 100644
index 0000000..4461b09
--- /dev/null
+++ b/src/main/resources/static/src/proxy.js
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+var module = app;
+module.controller('proxyController', ['$scope', '$location', '$http', 
'Notification', 'remoteApi', 'tools', '$window',
+    function ($scope, $location, $http, Notification, remoteApi, tools, 
$window) {
+        $scope.proxyAddrList = [];
+        $scope.userRole = $window.sessionStorage.getItem("userrole");
+        $scope.writeOperationEnabled = $scope.userRole == null ? true : 
($scope.userRole == 1 ? true : false);
+        $scope.inputReadonly = !$scope.writeOperationEnabled;
+        $scope.newProxyAddr = "";
+        $scope.allProxyConfig = {};
+
+        $http({
+            method: "GET",
+            url: "proxy/homePage.query"
+        }).success(function (resp) {
+            if (resp.status == 0) {
+                $scope.proxyAddrList = resp.data.proxyAddrList;
+                $scope.selectedProxy = resp.data.currentProxyAddr;
+                $scope.showProxyDetailConfig($scope.selectedProxy);
+                localStorage.setItem('proxyAddr',$scope.selectedProxy);
+            } else {
+                Notification.error({message: resp.errMsg, delay: 2000});
+            }
+        });
+
+        $scope.eleChange = function (data) {
+            $scope.proxyAddrList = data;
+        }
+        $scope.showDetailConf = function () {
+            $(".proxyModal").modal();
+        }
+
+
+        $scope.showProxyDetailConfig = function (proxyAddr) {
+            $http({
+                method: "GET",
+                url: "proxy/proxyDetailConfig.query",
+                params: {proxyAddress: proxyAddr}
+            }).success(function (resp) {
+                if (resp.status == 0) {
+                    $scope.allProxyConfig = resp.data;
+                } else {
+                    Notification.error({message: resp.errMsg, delay: 2000});
+                }
+            });
+        };
+
+        $scope.updateProxyAddr = function () {
+            $http({
+                method: "POST",
+                url: "proxy/updateProxyAddr.do",
+                params: {proxyAddr: $scope.selectedProxy}
+            }).success(function (resp) {
+                if (resp.status == 0) {
+                    localStorage.setItem('proxyAddr', $scope.selectedProxy);
+                    Notification.info({message: "SUCCESS", delay: 2000});
+                } else {
+                    Notification.error({message: resp.errMsg, delay: 2000});
+                }
+            });
+            $scope.showProxyDetailConfig($scope.selectedProxy);
+        };
+
+        $scope.addProxyAddr = function () {
+            $http({
+                method: "POST",
+                url: "proxy/addProxyAddr.do",
+                params: {newProxyAddr: $scope.newProxyAddr}
+            }).success(function (resp) {
+                if (resp.status == 0) {
+                    if ($scope.proxyAddrList.indexOf($scope.newProxyAddr) == 
-1) {
+                        $scope.proxyAddrList.push($scope.newProxyAddr);
+                    }
+                    $("#proxyAddr").val("");
+                    $scope.newProxyAddr = "";
+                    Notification.info({message: "SUCCESS", delay: 2000});
+                } else {
+                    Notification.error({message: resp.errMsg, delay: 2000});
+                }
+            });
+        };
+    }])
diff --git a/src/main/resources/static/view/layout/_header.html 
b/src/main/resources/static/view/layout/_header.html
index a78b9f2..8159138 100644
--- a/src/main/resources/static/view/layout/_header.html
+++ b/src/main/resources/static/view/layout/_header.html
@@ -28,6 +28,7 @@
         <div class="navbar-collapse collapse navbar-warning-collapse">
             <ul class="nav navbar-nav">
                 <li ng-class="path =='ops' ? 'active':''"><a 
ng-href="#/ops">{{'OPS' | translate}}</a></li>
+                <li ng-show="rmqVersion" ng-class="path =='proxy' ? 
'active':''"><a ng-href="#/proxy">{{'PROXY' | translate}}</a></li>
                 <li ng-class="path =='dashboard' || path ==''? 'active':''"><a 
ng-href="#/">{{'DASHBOARD' | translate}}</a></li>
                 <li ng-class="path =='cluster' ? 'active':''"><a 
ng-href="#/cluster">{{'CLUSTER' | translate}}</a></li>
                 <li ng-class="path =='topic' ? 'active':''"><a 
ng-href="#/topic">{{'TOPIC' | translate}}</a></li>
diff --git a/src/main/resources/static/view/pages/consumer.html 
b/src/main/resources/static/view/pages/consumer.html
index 47fddad..d883afc 100644
--- a/src/main/resources/static/view/pages/consumer.html
+++ b/src/main/resources/static/view/pages/consumer.html
@@ -66,11 +66,11 @@
                         <td 
class="text-center">{{consumerGroup.consumeTps}}</td>
                         <td 
class="text-center">{{consumerGroup.diffTotal}}</td>
                         <td class="text-left">
-                            <button name="client" 
ng-click="client(consumerGroup.group)"
+                            <button name="client" 
ng-click="client(consumerGroup.group, consumerGroup.address)"
                                     class="btn btn-raised btn-sm btn-primary"
                                     type="button">{{'CLIENT' | translate}}
                             </button>
-                            <button name="client" 
ng-click="detail(consumerGroup.group)"
+                            <button name="client" 
ng-click="detail(consumerGroup.group, consumerGroup.address)"
                                     class="btn btn-raised btn-sm btn-primary"
                                     type="button">{{'CONSUME_DETAIL' | 
translate}}
                             </button>
diff --git a/src/main/resources/static/view/pages/proxy.html 
b/src/main/resources/static/view/pages/proxy.html
new file mode 100644
index 0000000..43f34ce
--- /dev/null
+++ b/src/main/resources/static/view/pages/proxy.html
@@ -0,0 +1,67 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<div class="container-fluid" id="deployHistoryList">
+    <div class="page-content">
+        <h2 class="md-title">ProxyServerAddressList</h2>
+        <div class="pull-left" style="min-width: 400px; max-width: 500px; 
padding: 10px 10px 10px 0">
+            <select ng-model="selectedProxy" chosen
+                    ng-options="x for x in proxyAddrList"
+                    ng-change="updateProxyAddr()"
+                    required></select>
+        </div>
+        <div class="pull-left">
+            <button class="btn btn-raised btn-sm btn-primary" type="button" 
ng-show="{{writeOperationEnabled}}"
+                    ng-click="updateProxyAddr()">{{'UPDATE' | translate}}
+            </button>
+        </div>
+        <form class="form-inline pull-left" style="margin-left: 20px" 
ng-show="{{writeOperationEnabled}}">
+            <div class="form-group" style="margin: 0">
+                <label for="proxyAddr">ProxyAddr:</label>
+                <input id="proxyAddr" class="form-control" style="width: 
300px; margin: 0 10px 0 10px" type="text" ng-model="newProxyAddr" required/>
+                <button class="btn btn-raised btn-sm btn-primary" type="button"
+                        ng-click="addProxyAddr()"> {{ 'ADD' | translate}}
+                </button>
+            </div>
+        </form>
+    </div>
+</div>
+
+<div class="modal proxyModal fade" role="dialog" tabindex="-1" 
aria-hidden="true" aria-labelledby="config-modal-label">
+    <div class="modal-dialog modal-lg">
+        <div class="modal-content" >
+            <div class="modal-header">
+                <button class="close" type="button" 
data-dismiss="modal">&times;</button>
+                <h4 id="config-modal-label" class="modal-title">
+                    [{{selectedProxy}}]
+                </h4>
+            </div>
+            <div class="modal-body limit_height">
+                <table class="table table-bordered">
+                    <tr ng-repeat="(key, value) in allProxyConfig">
+                        <td>{{key}}</td>
+                        <td>{{value}}</td>
+                    </tr>
+                </table>
+            </div>
+            <div class="modal-footer">
+                <div class="col-md-12 text-center">
+                    <button type="button" class="btn btn-raised" 
data-dismiss="modal">{{ 'CLOSE' | translate }}</button>
+                </div>
+            </div>
+        </div>
+    </div>
+</div>


Reply via email to