This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7624231f9 [INLONG-4883][TubeMQ] No error report for incorrect topic 
subscription (#4913)
7624231f9 is described below

commit 7624231f9dc52a90c9d9a50c8775fe88beb0ee71
Author: gosonzhang <[email protected]>
AuthorDate: Fri Jul 8 09:32:21 2022 +0800

    [INLONG-4883][TubeMQ] No error report for incorrect topic subscription 
(#4913)
---
 .../inlong/tubemq/corebase/TErrCodeConstants.java  |  1 +
 .../server/common/paramcheck/PBParameterUtils.java | 52 ++++++++++++++--------
 .../server/common/utils/WebParameterUtils.java     |  2 +-
 .../inlong/tubemq/server/master/TMaster.java       | 22 +++++----
 .../master/metamanage/DefaultMetaDataService.java  |  4 +-
 .../server/master/metamanage/MetaDataService.java  |  2 +-
 .../metastore/dao/mapper/MetaConfigMapper.java     |  2 +-
 .../metastore/dao/mapper/TopicDeployMapper.java    |  2 +-
 .../metastore/impl/AbsMetaConfigMapperImpl.java    |  4 +-
 .../metastore/impl/AbsTopicDeployMapperImpl.java   |  2 +-
 .../web/handler/WebAdminGroupCtrlHandler.java      |  6 +--
 .../web/handler/WebGroupConsumeCtrlHandler.java    |  2 +-
 .../tubemq/server/common/PBParameterTest.java      | 30 +++++++++----
 13 files changed, 78 insertions(+), 53 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
 
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
index 30ff82715..b3487c20c 100644
--- 
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
+++ 
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
@@ -50,6 +50,7 @@ public class TErrCodeConstants {
     public static final int CLIENT_INCONSISTENT_SELECTBIG = 428;
     public static final int CLIENT_INCONSISTENT_SOURCECOUNT = 429;
     public static final int CLIENT_DUPLICATE_INDEXID = 430;
+    public static final int TOPIC_NOT_DEPLOYED = 431;
 
     public static final int CONSUME_GROUP_FORBIDDEN = 450;
     public static final int SERVER_CONSUME_SPEED_LIMIT = 452;
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
index 3cbadaa1b..9f3a9849e 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -95,20 +95,21 @@ public class PBParameterUtils {
     /**
      * Check request topic list of consumer
      *
+     * @param depTopicSet  the deployed topic set
      * @param reqTopicLst the topic list to be checked.
-     * @param strBuffer   a string buffer used to construct the result
+     * @param strBuff   a string buffer used to construct the result
      * @return the check result
      */
-    public static ParamCheckResult checkConsumerTopicList(final List<String> 
reqTopicLst,
-                                                          final StringBuilder 
strBuffer) {
-        ParamCheckResult retResult = new ParamCheckResult();
-        if ((reqTopicLst == null)
-                || (reqTopicLst.isEmpty())) {
-            retResult.setCheckResult(false,
-                    TErrCodeConstants.BAD_REQUEST,
+    public static boolean checkConsumerTopicList(Set<String> depTopicSet,
+                                                 List<String> reqTopicLst,
+                                                 ProcessResult result,
+                                                 StringBuilder strBuff) {
+        if ((reqTopicLst == null) || (reqTopicLst.isEmpty())) {
+            result.setFailResult(TErrCodeConstants.BAD_REQUEST,
                     "Request miss necessary subscribed topicList data!");
-            return retResult;
+            return result.isSuccess();
         }
+        // remove spaces
         Set<String> transTopicSet = new HashSet<>();
         for (String topicItem : reqTopicLst) {
             if (TStringUtils.isBlank(topicItem)) {
@@ -117,21 +118,34 @@ public class PBParameterUtils {
             transTopicSet.add(topicItem.trim());
         }
         if (transTopicSet.isEmpty()) {
-            retResult.setCheckResult(false,
-                    TErrCodeConstants.BAD_REQUEST,
+            result.setFailResult(TErrCodeConstants.BAD_REQUEST,
                     "Request subscribed topicList data must not Blank!");
-            return retResult;
+            return result.isSuccess();
         }
+        // check if exceed max topic count booked
         if (transTopicSet.size() > TBaseConstants.META_MAX_BOOKED_TOPIC_COUNT) 
{
-            retResult.setCheckResult(false,
-                    TErrCodeConstants.BAD_REQUEST,
-                    strBuffer.append("Subscribed topicList size over max 
value, required max count is ")
+            result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+                    strBuff.append("Subscribed topicList size over max value, 
required max count is ")
                             
.append(TBaseConstants.META_MAX_BOOKED_TOPIC_COUNT).toString());
-            strBuffer.delete(0, strBuffer.length());
-            return retResult;
+            strBuff.delete(0, strBuff.length());
+            return result.isSuccess();
         }
-        retResult.setCheckData(transTopicSet);
-        return retResult;
+        // Check if the topics all in deployment
+        Set<String> invalidTopicSet = new HashSet<>();
+        for (String reqTopic : transTopicSet) {
+            if (!depTopicSet.contains(reqTopic)) {
+                invalidTopicSet.add(reqTopic);
+            }
+        }
+        if (!invalidTopicSet.isEmpty()) {
+            result.setFailResult(TErrCodeConstants.TOPIC_NOT_DEPLOYED,
+                    strBuff.append("Requested topic [").append(invalidTopicSet)
+                            .append("] not deployed!").toString());
+            strBuff.delete(0, strBuff.length());
+            return result.isSuccess();
+        }
+        result.setSuccResult(transTopicSet);
+        return result.isSuccess();
     }
 
     /**
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
index 65c9413b1..e382b0f92 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
@@ -829,7 +829,7 @@ public class WebParameterUtils {
         }
         Set<String> topicNameSet = (Set<String>) result.getRetData();
         Set<String> existedTopicSet =
-                defMetaDataService.getTotalConfiguredTopicNames();
+                defMetaDataService.getDeployedTopicSet();
         for (String topic : topicNameSet) {
             if (!existedTopicSet.contains(topic)) {
                 
result.setFailResult(sBuffer.append(WebFieldDef.COMPSTOPICNAME.name)
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index 5a611e804..edef4c269 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -552,13 +552,13 @@ public class TMaster extends HasThread implements 
MasterService, Stoppable {
             return builder.build();
         }
         final String groupName = (String) paramCheckResult.checkData;
-        paramCheckResult = 
PBParameterUtils.checkConsumerTopicList(request.getTopicListList(), strBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        if 
(!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
+                request.getTopicListList(), result, strBuffer)) {
+            builder.setErrCode(result.getErrCode());
+            builder.setErrMsg(result.getErrMsg());
             return builder.build();
         }
-        Set<String> reqTopicSet = (Set<String>) paramCheckResult.checkData;
+        final Set<String> reqTopicSet = (Set<String>) result.getRetData();
         String requiredParts = request.hasRequiredPartition() ? 
request.getRequiredPartition() : "";
         ConsumeType csmType = (request.hasRequireBound() && 
request.getRequireBound())
                 ? ConsumeType.CONSUME_BAND : ConsumeType.CONSUME_NORMAL;
@@ -1234,15 +1234,13 @@ public class TMaster extends HasThread implements 
MasterService, Stoppable {
             return builder.build();
         }
         final String groupName = (String) paramCheckResult.checkData;
-        paramCheckResult =
-                PBParameterUtils.checkConsumerTopicList(
-                        request.getTopicListList(), sBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        if 
(!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
+                request.getTopicListList(), result, sBuffer)) {
+            builder.setErrCode(result.getErrCode());
+            builder.setErrMsg(result.getErrMsg());
             return builder.build();
         }
-        final Set<String> reqTopicSet = (Set<String>) 
paramCheckResult.checkData;
+        final Set<String> reqTopicSet = (Set<String>) result.getRetData();
         final Map<String, TreeSet<String>> reqTopicConditions =
                 
DataConverterUtil.convertTopicConditions(request.getTopicConditionList());
         int sourceCount = request.getSourceCount();
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
index f2c707280..273604b5c 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
@@ -556,8 +556,8 @@ public class DefaultMetaDataService implements 
MetaDataService {
     }
 
     @Override
-    public Set<String> getTotalConfiguredTopicNames() {
-        return metaConfigMapper.getConfiguredTopicSet();
+    public Set<String> getDeployedTopicSet() {
+        return metaConfigMapper.getDeployedTopicSet();
     }
 
     @Override
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
index e6e382a5b..95f94270b 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
@@ -269,7 +269,7 @@ public interface MetaDataService extends Server {
      *
      * @return  the deployed topic set
      */
-    Set<String> getTotalConfiguredTopicNames();
+    Set<String> getDeployedTopicSet();
 
     /**
      * Get broker configure entity by broker id
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
index 0ce5456b7..8acfb75d9 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
@@ -361,7 +361,7 @@ public interface MetaConfigMapper extends KeepAliveService {
      *
      * @return  the deployed topic set
      */
-    Set<String> getConfiguredTopicSet();
+    Set<String> getDeployedTopicSet();
 
     // ////////////////////////////////////////////////////////////
 
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
index 05cb02bdf..26d782b9f 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
@@ -118,6 +118,6 @@ public interface TopicDeployMapper extends AbstractMapper {
 
     Map<String/* topicName */, Map<Integer, String>> 
getTopicBrokerInfo(Set<String> topicNameSet);
 
-    Set<String> getConfiguredTopicSet();
+    Set<String> getDeployedTopicSet();
 
 }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
index d2d39a033..9e1421508 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
@@ -869,8 +869,8 @@ public abstract class AbsMetaConfigMapperImpl implements 
MetaConfigMapper {
     }
 
     @Override
-    public Set<String> getConfiguredTopicSet() {
-        return topicDeployMapper.getConfiguredTopicSet();
+    public Set<String> getDeployedTopicSet() {
+        return topicDeployMapper.getDeployedTopicSet();
     }
 
     // 
//////////////////////////////////////////////////////////////////////////////
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
index b0b40c7e6..5ed2e5102 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
@@ -447,7 +447,7 @@ public abstract class AbsTopicDeployMapperImpl implements 
TopicDeployMapper {
     }
 
     @Override
-    public Set<String> getConfiguredTopicSet() {
+    public Set<String> getDeployedTopicSet() {
         return new HashSet<>(topicName2RecordCache.keySet());
     }
 
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index dc9f688dc..902a8b371 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -1279,7 +1279,7 @@ public class WebAdminGroupCtrlHandler extends 
AbstractWebHandler {
         GroupConsumeCtrlEntity itemEntity;
         Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
         Set<String> configuredTopicSet =
-                defMetaDataService.getTotalConfiguredTopicNames();
+                defMetaDataService.getDeployedTopicSet();
         for (Map<String, String> itemValueMap : groupJsonArray) {
             // check and get operation info
             if (!WebParameterUtils.getAUDBaseInfo(itemValueMap,
@@ -1349,7 +1349,7 @@ public class WebAdminGroupCtrlHandler extends 
AbstractWebHandler {
         Map<String, String> itemValueMap;
         Map<String, GroupResCtrlEntity> addRecordMap = new HashMap<>();
         Set<String> configuredTopicSet =
-                defMetaDataService.getTotalConfiguredTopicNames();
+                defMetaDataService.getDeployedTopicSet();
         for (int j = 0; j < groupJsonArray.size(); j++) {
             itemValueMap = groupJsonArray.get(j);
             // check and get operation info
@@ -1409,7 +1409,7 @@ public class WebAdminGroupCtrlHandler extends 
AbstractWebHandler {
         GroupConsumeCtrlEntity itemEntity;
         Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
         Set<String> configuredTopicSet =
-                defMetaDataService.getTotalConfiguredTopicNames();
+                defMetaDataService.getDeployedTopicSet();
         for (Map<String, String> itemValueMap : groupJsonArray) {
             // check and get operation info
             if (!WebParameterUtils.getAUDBaseInfo(itemValueMap,
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
index 18b527677..347a46183 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
@@ -421,7 +421,7 @@ public class WebGroupConsumeCtrlHandler extends 
AbstractWebHandler {
         GroupConsumeCtrlEntity itemConf;
         Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
         Set<String> configuredTopicSet =
-                defMetaDataService.getTotalConfiguredTopicNames();
+                defMetaDataService.getDeployedTopicSet();
         for (Map<String, String> itemsMap : filterJsonArray) {
             // check and get operation info
             if (!WebParameterUtils.getAUDBaseInfo(itemsMap,
diff --git 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java
 
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java
index da69dd8d0..5ddced8c7 100644
--- 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java
+++ 
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java
@@ -18,8 +18,11 @@
 package org.apache.inlong.tubemq.server.common;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
 import org.apache.inlong.tubemq.server.common.paramcheck.PBParameterUtils;
 import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
 import org.junit.Assert;
@@ -43,17 +46,26 @@ public class PBParameterTest {
 
     @Test
     public void checkConsumerTopicTest() {
-        ParamCheckResult result = 
PBParameterUtils.checkConsumerTopicList(null, null);
-        Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST);
-        final List<String> topicList = new ArrayList<>();
-        topicList.add("test1");
-        result = PBParameterUtils.checkConsumerTopicList(topicList, new 
StringBuilder(128));
-        Assert.assertEquals(result.errCode, TErrCodeConstants.SUCCESS);
+        ProcessResult result = new ProcessResult();
+        PBParameterUtils.checkConsumerTopicList(null, null, result, null);
+        Assert.assertEquals(result.getErrCode(), 
TErrCodeConstants.BAD_REQUEST);
+        final Set<String> depTopicList = new HashSet<>();
+        final List<String> reqTopicList = new ArrayList<>();
+        depTopicList.add("test1");
+        reqTopicList.add("test1");
+        PBParameterUtils.checkConsumerTopicList(depTopicList,
+                reqTopicList, result, new StringBuilder(128));
+        Assert.assertEquals(result.getErrCode(), TErrCodeConstants.SUCCESS);
+        reqTopicList.add("test2");
+        PBParameterUtils.checkConsumerTopicList(depTopicList,
+                reqTopicList, result, new StringBuilder(128));
+        Assert.assertEquals(result.getErrCode(), 
TErrCodeConstants.TOPIC_NOT_DEPLOYED);
         for (int i = 0; i < 1025; i++) {
-            topicList.add("test" + i);
+            reqTopicList.add("test" + i);
         }
-        result = PBParameterUtils.checkConsumerTopicList(topicList, new 
StringBuilder(128));
-        Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST);
+        PBParameterUtils.checkConsumerTopicList(depTopicList,
+                reqTopicList, result, new StringBuilder(128));
+        Assert.assertEquals(result.getErrCode(), 
TErrCodeConstants.BAD_REQUEST);
     }
 
     @Test

Reply via email to