This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 59aea8948 [INLONG-4942][TubeMQ] Add the display of the IP address of 
consumer (#4944)
59aea8948 is described below

commit 59aea8948ad76f4d6d87f24f571c343a80585c7c
Author: Goson Zhang <[email protected]>
AuthorDate: Sat Jul 9 12:16:12 2022 +0800

    [INLONG-4942][TubeMQ] Add the display of the IP address of consumer (#4944)
---
 .../tubemq/server/broker/BrokerServiceServer.java  | 18 ++++++++++------
 .../broker/msgstore/MessageStoreManager.java       |  2 +-
 .../server/broker/nodeinfo/ConsumerNodeInfo.java   | 25 +++++++++++++++-------
 .../server/broker/web/BrokerAdminServlet.java      |  7 +++---
 .../inlong/tubemq/server/master/TMaster.java       |  8 ++++---
 .../nodemanage/nodeconsumer/ConsumerInfo.java      | 14 ++++++++++--
 .../master/web/handler/WebOtherInfoHandler.java    |  3 ++-
 7 files changed, 52 insertions(+), 25 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index 892f6c1c3..a0361398d 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -868,7 +868,8 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
                 partLock = brokerRowLock.getLock(null, 
StringUtils.getBytesUtf8(partStr), true);
                 if (request.getOpType() == RpcConstants.MSG_OPTYPE_REGISTER) {
                     return inProcessConsumerRegister(clientId, groupName,
-                            topicName, partStr, filterCondSet, overtls, 
request, builder, strBuffer);
+                            topicName, partStr, filterCondSet, rmtAddress,
+                            overtls, request, builder, strBuffer);
                 } else if (request.getOpType() == 
RpcConstants.MSG_OPTYPE_UNREGISTER) {
                     return inProcessConsumerUnregister(clientId, groupName,
                             topicName, partStr, request, overtls, builder, 
strBuffer);
@@ -906,16 +907,17 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
      * @param topicName       the topic name
      * @param partStr         the group-topic-partitionId key
      * @param filterCondSet   the filter condition set
+     * @param msgRcvFrom      the address message received
      * @param overtls      whether transfer over TLS
      * @param request         the request
      * @param builder      the response builder
      * @param strBuffer    the string buffer
      * @return             the response
      */
-    private RegisterResponseB2C inProcessConsumerRegister(final String 
clientId, final String groupName,
-                                                          final String 
topicName, final String partStr,
-                                                          final Set<String> 
filterCondSet, boolean overtls,
-                                                          RegisterRequestC2B 
request,
+    private RegisterResponseB2C inProcessConsumerRegister(String clientId, 
String groupName,
+                                                          String topicName, 
String partStr,
+                                                          Set<String> 
filterCondSet, String msgRcvFrom,
+                                                          boolean overtls, 
RegisterRequestC2B request,
                                                           
RegisterResponseB2C.Builder builder,
                                                           StringBuilder 
strBuffer) {
         String consumerId = null;
@@ -929,8 +931,10 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
             String reqSessionKey = request.hasSessionKey() ? 
request.getSessionKey() : null;
             int reqQryPriorityId = request.hasQryPriorityId()
                     ? request.getQryPriorityId() : 
TBaseConstants.META_VALUE_UNDEFINED;
-            consumerNodeInfo = new ConsumerNodeInfo(storeManager, 
reqQryPriorityId,
-                    clientId, filterCondSet, reqSessionKey, reqSessionTime, 
true, partStr);
+            consumerNodeInfo =
+                    new ConsumerNodeInfo(storeManager, reqQryPriorityId, 
clientId,
+                            filterCondSet, reqSessionKey, reqSessionTime,
+                            true, partStr, msgRcvFrom);
             if (consumerRegisterMap.put(partStr, consumerNodeInfo) == null) {
                 BrokerSrvStatsHolder.incConsumerOnlineCnt();
             }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 6f10324a8..1539c9294 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -367,7 +367,7 @@ public class MessageStoreManager implements StoreService {
             final long maxOffset = msgStore.getIndexMaxOffset();
             ConsumerNodeInfo consumerNodeInfo =
                     new ConsumerNodeInfo(tubeBroker.getStoreManager(),
-                            "visit", filterCondSet, "", 
System.currentTimeMillis(), "");
+                            "visit", filterCondSet, "", 
System.currentTimeMillis(), "", "");
             int maxIndexReadSize = (msgCount + 1)
                     * DataStoreUtils.STORE_INDEX_HEAD_LEN * 
msgStore.getPartitionNum();
             if (filterCondSet != null && !filterCondSet.isEmpty()) {
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
index 487b95ec6..2071ac474 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
@@ -45,6 +45,7 @@ public class ConsumerNodeInfo {
     // filter conditions in int format
     private final Set<Integer> filterCondCode = new HashSet<>(10);
     // consumer's address
+    private String addrRcvFrom;
     private String rmtAddrInfo;
     private boolean isSupportLimit = false;
     private long nextStatTime = 0L;
@@ -70,12 +71,14 @@ public class ConsumerNodeInfo {
      * @param sessionKey      the session key
      * @param sessionTime     the session create time
      * @param partStr         the partition information
+     * @param msgRcvFrom      the address message received
      */
-    public ConsumerNodeInfo(final MessageStoreManager storeManager,
-                            final String consumerId, Set<String> filterCodes,
-                            final String sessionKey, long sessionTime, final 
String partStr) {
+    public ConsumerNodeInfo(MessageStoreManager storeManager,
+                            String consumerId, Set<String> filterCodes,
+                            String sessionKey, long sessionTime, String 
partStr,
+                            String msgRcvFrom) {
         this(storeManager, TBaseConstants.META_VALUE_UNDEFINED, consumerId,
-            filterCodes, sessionKey, sessionTime, false, partStr);
+                filterCodes, sessionKey, sessionTime, false, partStr, 
msgRcvFrom);
     }
 
     /**
@@ -89,12 +92,13 @@ public class ConsumerNodeInfo {
      * @param sessionTime        the session create time
      * @param isSupportLimit     whether to support limited consumption 
function
      * @param partStr            the partition information
+     * @param msgRcvFrom         the address message received
      */
-    public ConsumerNodeInfo(final MessageStoreManager storeManager,
-                            final int qryPriorityId, final String consumerId,
-                            Set<String> filterCodes, final String sessionKey,
+    public ConsumerNodeInfo(MessageStoreManager storeManager,
+                            int qryPriorityId, String consumerId,
+                            Set<String> filterCodes, String sessionKey,
                             long sessionTime, boolean isSupportLimit,
-                            final String partStr) {
+                            String partStr, String msgRcvFrom) {
         setConsumerId(consumerId);
         if (filterCodes != null) {
             for (String filterItem : filterCodes) {
@@ -107,6 +111,7 @@ public class ConsumerNodeInfo {
         this.qryPriorityId.set(qryPriorityId);
         this.storeManager = storeManager;
         this.partStr = partStr;
+        this.addrRcvFrom = msgRcvFrom;
         this.createTime = System.currentTimeMillis();
         if (filterCodes != null && !filterCodes.isEmpty()) {
             this.isFilterConsume = true;
@@ -245,6 +250,10 @@ public class ConsumerNodeInfo {
         return sessionTime;
     }
 
+    public String getAddrRcvFrom() {
+        return addrRcvFrom;
+    }
+
     public void setLastProcInfo(long lastGetTime, long lastRdDataOffset, int 
totalMsgSize) {
         this.lastGetTime = lastGetTime;
         this.lastDataRdOffset = lastRdDataOffset;
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
index ab00b5bca..1f4b92cd6 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -210,7 +210,8 @@ public class BrokerAdminServlet extends AbstractWebHandler {
                         .append(regTime).append(",\"isFilterConsume\":")
                         .append(ifFilterConsume);
             }
-            
strBuff.append(",\"qryPriorityId\":").append(entry.getValue().getQryPriorityId())
+            
strBuff.append(",\"receivedFrom\":\"").append(entry.getValue().getAddrRcvFrom())
+                    
.append("\",\"qryPriorityId\":").append(entry.getValue().getQryPriorityId())
                     
.append(",\"curDataLimitInM\":").append(entry.getValue().getCurFlowCtrlLimitSize())
                     
.append(",\"curFreqLimit\":").append(entry.getValue().getCurFlowCtrlFreqLimit())
                     
.append(",\"totalSentSec\":").append(entry.getValue().getSentMsgSize())
@@ -777,7 +778,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         // get the maximum query turns
         if (!WebParameterUtils.getIntParamValue(req,
                 WebFieldDef.MAXRETRYCOUNT, false,
-                2, 1, 5, sBuffer, result)) {
+                2, 1, 30, sBuffer, result)) {
             WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
             return;
         }
@@ -853,7 +854,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         filterCodes.add(groupName);
         // build consumer node information
         ConsumerNodeInfo consumerNodeInfo = new 
ConsumerNodeInfo(broker.getStoreManager(),
-                "offsetConsumer", filterCodes, "", System.currentTimeMillis(), 
"");
+                "offsetConsumer", filterCodes, "", System.currentTimeMillis(), 
"", "");
         // query records from storage
         int qryRetryCount = 0;
         long itemInitOffset = requestOffset;
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index edef4c269..2d38c5c18 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -516,7 +516,7 @@ public class TMaster extends HasThread implements 
MasterService, Stoppable {
      */
     @Override
     public RegisterResponseM2C consumerRegisterC2M(RegisterRequestC2M request,
-                                                   final String rmtAddress,
+                                                   String rmtAddress,
                                                    boolean overtls) throws 
Exception {
         // #lizard forgives
         ProcessResult result = new ProcessResult();
@@ -592,7 +592,8 @@ public class TMaster extends HasThread implements 
MasterService, Stoppable {
         ConsumerInfo inConsumerInfo =
                 new ConsumerInfo(consumerId, overtls, groupName,
                         reqTopicSet, reqTopicConditions, csmType,
-                        sessionKey, sessionTime, sourceCount, isSelectBig, 
requiredPartMap);
+                        sessionKey, sessionTime, sourceCount,
+                        isSelectBig, requiredPartMap, rmtAddress);
         paramCheckResult =
                 PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
                         masterConfig, defMetaDataService, brokerRunManager, 
strBuffer);
@@ -1269,7 +1270,8 @@ public class TMaster extends HasThread implements 
MasterService, Stoppable {
         ConsumerInfo inConsumerInfo =
                 new ConsumerInfo(consumerId, overtls, groupName, csmType,
                         sourceCount, nodeId, reqTopicSet, reqTopicConditions,
-                        opsTaskInfo.getCsmFromMaxOffsetCtrlId(), 
clientSyncInfo);
+                        opsTaskInfo.getCsmFromMaxOffsetCtrlId(), 
clientSyncInfo,
+                        rmtAddress);
         // need removed for authorize center begin
         if (!this.defMetaDataService
                 .isConsumeTargetAuthorized(consumerId, groupName,
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
index 4828e98d5..2a9eb18ba 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
@@ -36,6 +36,7 @@ public class ConsumerInfo implements 
Comparable<ConsumerInfo>, Serializable {
     private final String group;
     private final Set<String> topicSet;
     private final Map<String, TreeSet<String>> topicConditions;
+    private String addrRcvFrom;
     private boolean overTLS = false;
     private long startTime = TBaseConstants.META_VALUE_UNDEFINED;
     private int sourceCount = TBaseConstants.META_VALUE_UNDEFINED;
@@ -64,15 +65,17 @@ public class ConsumerInfo implements 
Comparable<ConsumerInfo>, Serializable {
      * @param sourceCount         the minimum consumer count of consume group
      * @param selectedBig         whether to choose a larger value if there is 
a conflict
      * @param requiredPartition   the required partitions
+     * @param msgRcvFrom          the address received message
      */
     public ConsumerInfo(String consumerId, boolean overTLS, String group,
                         Set<String> topicSet, Map<String, TreeSet<String>> 
topicConditions,
                         ConsumeType consumeType, String sessionKey, long 
startTime,
                         int sourceCount, boolean selectedBig,
-                        Map<String, Long> requiredPartition) {
+                        Map<String, Long> requiredPartition, String 
msgRcvFrom) {
         this.group = group;
         this.consumeType = consumeType;
         this.consumerId = consumerId;
+        this.addrRcvFrom = msgRcvFrom;
         this.overTLS = overTLS;
         this.topicSet = topicSet;
         if (topicConditions == null) {
@@ -101,14 +104,16 @@ public class ConsumerInfo implements 
Comparable<ConsumerInfo>, Serializable {
      * @param topicConditions     the topic filter condition set
      * @param curCsmCtrlId        the node's consume control id
      * @param syncInfo            the consumer report information
+     * @param msgRcvFrom          the address received message
      */
     public ConsumerInfo(String consumerId, boolean overTLS, String group,
                         ConsumeType consumeType, int sourceCount, int nodeId,
                         Set<String> topicSet, Map<String, TreeSet<String>> 
topicConditions,
-                        long curCsmCtrlId, ClientSyncInfo syncInfo) {
+                        long curCsmCtrlId, ClientSyncInfo syncInfo, String 
msgRcvFrom) {
         this.group = group;
         this.consumeType = consumeType;
         this.consumerId = consumerId;
+        this.addrRcvFrom = msgRcvFrom;
         this.overTLS = overTLS;
         this.topicSet = topicSet;
         if (topicConditions == null) {
@@ -132,6 +137,7 @@ public class ConsumerInfo implements 
Comparable<ConsumerInfo>, Serializable {
             return;
         }
         this.overTLS = inCsmInfo.overTLS;
+        this.addrRcvFrom = inCsmInfo.addrRcvFrom;
         this.nodeId = inCsmInfo.getNodeId();
         updClientReportInfo(inCsmInfo.getCsmFromMaxOffsetCtrlId(),
                 inCsmInfo.getLstAssignedTime(), 
inCsmInfo.getUsedTopicMetaInfoId());
@@ -211,6 +217,10 @@ public class ConsumerInfo implements 
Comparable<ConsumerInfo>, Serializable {
         return topicConditions;
     }
 
+    public String getAddrRcvFrom() {
+        return addrRcvFrom;
+    }
+
     public boolean isOverTLS() {
         return overTLS;
     }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
index 0d02675af..4418fa68e 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
@@ -496,7 +496,8 @@ public class WebOtherInfoHandler extends AbstractWebHandler 
{
                     strBuffer.append(",");
                 }
                 
strBuffer.append("{\"consumerId\":\"").append(consumer.getConsumerId())
-                        
.append("\"").append(",\"isOverTLS\":").append(consumer.isOverTLS());
+                        
.append("\",\"receivedFrom\":\"").append(consumer.getAddrRcvFrom())
+                        
.append("\",\"isOverTLS\":").append(consumer.isOverTLS());
                 if (consumeType == ConsumeType.CONSUME_BAND) {
                     Map<String, Long> requiredPartition = 
consumer.getRequiredPartition();
                     if (requiredPartition == null || 
requiredPartition.isEmpty()) {

Reply via email to