This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7624231f9 [INLONG-4883][TubeMQ] No error report for incorrect topic
subscription (#4913)
7624231f9 is described below
commit 7624231f9dc52a90c9d9a50c8775fe88beb0ee71
Author: gosonzhang <[email protected]>
AuthorDate: Fri Jul 8 09:32:21 2022 +0800
[INLONG-4883][TubeMQ] No error report for incorrect topic subscription
(#4913)
---
.../inlong/tubemq/corebase/TErrCodeConstants.java | 1 +
.../server/common/paramcheck/PBParameterUtils.java | 52 ++++++++++++++--------
.../server/common/utils/WebParameterUtils.java | 2 +-
.../inlong/tubemq/server/master/TMaster.java | 22 +++++----
.../master/metamanage/DefaultMetaDataService.java | 4 +-
.../server/master/metamanage/MetaDataService.java | 2 +-
.../metastore/dao/mapper/MetaConfigMapper.java | 2 +-
.../metastore/dao/mapper/TopicDeployMapper.java | 2 +-
.../metastore/impl/AbsMetaConfigMapperImpl.java | 4 +-
.../metastore/impl/AbsTopicDeployMapperImpl.java | 2 +-
.../web/handler/WebAdminGroupCtrlHandler.java | 6 +--
.../web/handler/WebGroupConsumeCtrlHandler.java | 2 +-
.../tubemq/server/common/PBParameterTest.java | 30 +++++++++----
13 files changed, 78 insertions(+), 53 deletions(-)
diff --git
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
index 30ff82715..b3487c20c 100644
---
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
+++
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
@@ -50,6 +50,7 @@ public class TErrCodeConstants {
public static final int CLIENT_INCONSISTENT_SELECTBIG = 428;
public static final int CLIENT_INCONSISTENT_SOURCECOUNT = 429;
public static final int CLIENT_DUPLICATE_INDEXID = 430;
+ public static final int TOPIC_NOT_DEPLOYED = 431;
public static final int CONSUME_GROUP_FORBIDDEN = 450;
public static final int SERVER_CONSUME_SPEED_LIMIT = 452;
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
index 3cbadaa1b..9f3a9849e 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -95,20 +95,21 @@ public class PBParameterUtils {
/**
* Check request topic list of consumer
*
+ * @param depTopicSet the deployed topic set
* @param reqTopicLst the topic list to be checked.
- * @param strBuffer a string buffer used to construct the result
+ * @param strBuff a string buffer used to construct the result
* @return the check result
*/
- public static ParamCheckResult checkConsumerTopicList(final List<String>
reqTopicLst,
- final StringBuilder
strBuffer) {
- ParamCheckResult retResult = new ParamCheckResult();
- if ((reqTopicLst == null)
- || (reqTopicLst.isEmpty())) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
+ public static boolean checkConsumerTopicList(Set<String> depTopicSet,
+ List<String> reqTopicLst,
+ ProcessResult result,
+ StringBuilder strBuff) {
+ if ((reqTopicLst == null) || (reqTopicLst.isEmpty())) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
"Request miss necessary subscribed topicList data!");
- return retResult;
+ return result.isSuccess();
}
+ // remove spaces
Set<String> transTopicSet = new HashSet<>();
for (String topicItem : reqTopicLst) {
if (TStringUtils.isBlank(topicItem)) {
@@ -117,21 +118,34 @@ public class PBParameterUtils {
transTopicSet.add(topicItem.trim());
}
if (transTopicSet.isEmpty()) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
"Request subscribed topicList data must not Blank!");
- return retResult;
+ return result.isSuccess();
}
+ // check if exceed max topic count booked
if (transTopicSet.size() > TBaseConstants.META_MAX_BOOKED_TOPIC_COUNT)
{
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.append("Subscribed topicList size over max
value, required max count is ")
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ strBuff.append("Subscribed topicList size over max value,
required max count is ")
.append(TBaseConstants.META_MAX_BOOKED_TOPIC_COUNT).toString());
- strBuffer.delete(0, strBuffer.length());
- return retResult;
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
}
- retResult.setCheckData(transTopicSet);
- return retResult;
+ // Check if the topics all in deployment
+ Set<String> invalidTopicSet = new HashSet<>();
+ for (String reqTopic : transTopicSet) {
+ if (!depTopicSet.contains(reqTopic)) {
+ invalidTopicSet.add(reqTopic);
+ }
+ }
+ if (!invalidTopicSet.isEmpty()) {
+ result.setFailResult(TErrCodeConstants.TOPIC_NOT_DEPLOYED,
+ strBuff.append("Requested topic [").append(invalidTopicSet)
+ .append("] not deployed!").toString());
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
+ }
+ result.setSuccResult(transTopicSet);
+ return result.isSuccess();
}
/**
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
index 65c9413b1..e382b0f92 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
@@ -829,7 +829,7 @@ public class WebParameterUtils {
}
Set<String> topicNameSet = (Set<String>) result.getRetData();
Set<String> existedTopicSet =
- defMetaDataService.getTotalConfiguredTopicNames();
+ defMetaDataService.getDeployedTopicSet();
for (String topic : topicNameSet) {
if (!existedTopicSet.contains(topic)) {
result.setFailResult(sBuffer.append(WebFieldDef.COMPSTOPICNAME.name)
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index 5a611e804..edef4c269 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -552,13 +552,13 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
return builder.build();
}
final String groupName = (String) paramCheckResult.checkData;
- paramCheckResult =
PBParameterUtils.checkConsumerTopicList(request.getTopicListList(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if
(!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
+ request.getTopicListList(), result, strBuffer)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- Set<String> reqTopicSet = (Set<String>) paramCheckResult.checkData;
+ final Set<String> reqTopicSet = (Set<String>) result.getRetData();
String requiredParts = request.hasRequiredPartition() ?
request.getRequiredPartition() : "";
ConsumeType csmType = (request.hasRequireBound() &&
request.getRequireBound())
? ConsumeType.CONSUME_BAND : ConsumeType.CONSUME_NORMAL;
@@ -1234,15 +1234,13 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
return builder.build();
}
final String groupName = (String) paramCheckResult.checkData;
- paramCheckResult =
- PBParameterUtils.checkConsumerTopicList(
- request.getTopicListList(), sBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if
(!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
+ request.getTopicListList(), result, sBuffer)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final Set<String> reqTopicSet = (Set<String>)
paramCheckResult.checkData;
+ final Set<String> reqTopicSet = (Set<String>) result.getRetData();
final Map<String, TreeSet<String>> reqTopicConditions =
DataConverterUtil.convertTopicConditions(request.getTopicConditionList());
int sourceCount = request.getSourceCount();
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
index f2c707280..273604b5c 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
@@ -556,8 +556,8 @@ public class DefaultMetaDataService implements
MetaDataService {
}
@Override
- public Set<String> getTotalConfiguredTopicNames() {
- return metaConfigMapper.getConfiguredTopicSet();
+ public Set<String> getDeployedTopicSet() {
+ return metaConfigMapper.getDeployedTopicSet();
}
@Override
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
index e6e382a5b..95f94270b 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
@@ -269,7 +269,7 @@ public interface MetaDataService extends Server {
*
* @return the deployed topic set
*/
- Set<String> getTotalConfiguredTopicNames();
+ Set<String> getDeployedTopicSet();
/**
* Get broker configure entity by broker id
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
index 0ce5456b7..8acfb75d9 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
@@ -361,7 +361,7 @@ public interface MetaConfigMapper extends KeepAliveService {
*
* @return the deployed topic set
*/
- Set<String> getConfiguredTopicSet();
+ Set<String> getDeployedTopicSet();
// ////////////////////////////////////////////////////////////
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
index 05cb02bdf..26d782b9f 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
@@ -118,6 +118,6 @@ public interface TopicDeployMapper extends AbstractMapper {
Map<String/* topicName */, Map<Integer, String>>
getTopicBrokerInfo(Set<String> topicNameSet);
- Set<String> getConfiguredTopicSet();
+ Set<String> getDeployedTopicSet();
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
index d2d39a033..9e1421508 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
@@ -869,8 +869,8 @@ public abstract class AbsMetaConfigMapperImpl implements
MetaConfigMapper {
}
@Override
- public Set<String> getConfiguredTopicSet() {
- return topicDeployMapper.getConfiguredTopicSet();
+ public Set<String> getDeployedTopicSet() {
+ return topicDeployMapper.getDeployedTopicSet();
}
//
//////////////////////////////////////////////////////////////////////////////
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
index b0b40c7e6..5ed2e5102 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
@@ -447,7 +447,7 @@ public abstract class AbsTopicDeployMapperImpl implements
TopicDeployMapper {
}
@Override
- public Set<String> getConfiguredTopicSet() {
+ public Set<String> getDeployedTopicSet() {
return new HashSet<>(topicName2RecordCache.keySet());
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index dc9f688dc..902a8b371 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -1279,7 +1279,7 @@ public class WebAdminGroupCtrlHandler extends
AbstractWebHandler {
GroupConsumeCtrlEntity itemEntity;
Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
Set<String> configuredTopicSet =
- defMetaDataService.getTotalConfiguredTopicNames();
+ defMetaDataService.getDeployedTopicSet();
for (Map<String, String> itemValueMap : groupJsonArray) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(itemValueMap,
@@ -1349,7 +1349,7 @@ public class WebAdminGroupCtrlHandler extends
AbstractWebHandler {
Map<String, String> itemValueMap;
Map<String, GroupResCtrlEntity> addRecordMap = new HashMap<>();
Set<String> configuredTopicSet =
- defMetaDataService.getTotalConfiguredTopicNames();
+ defMetaDataService.getDeployedTopicSet();
for (int j = 0; j < groupJsonArray.size(); j++) {
itemValueMap = groupJsonArray.get(j);
// check and get operation info
@@ -1409,7 +1409,7 @@ public class WebAdminGroupCtrlHandler extends
AbstractWebHandler {
GroupConsumeCtrlEntity itemEntity;
Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
Set<String> configuredTopicSet =
- defMetaDataService.getTotalConfiguredTopicNames();
+ defMetaDataService.getDeployedTopicSet();
for (Map<String, String> itemValueMap : groupJsonArray) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(itemValueMap,
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
index 18b527677..347a46183 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
@@ -421,7 +421,7 @@ public class WebGroupConsumeCtrlHandler extends
AbstractWebHandler {
GroupConsumeCtrlEntity itemConf;
Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
Set<String> configuredTopicSet =
- defMetaDataService.getTotalConfiguredTopicNames();
+ defMetaDataService.getDeployedTopicSet();
for (Map<String, String> itemsMap : filterJsonArray) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(itemsMap,
diff --git
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java
index da69dd8d0..5ddced8c7 100644
---
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java
+++
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java
@@ -18,8 +18,11 @@
package org.apache.inlong.tubemq.server.common;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.server.common.paramcheck.PBParameterUtils;
import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
import org.junit.Assert;
@@ -43,17 +46,26 @@ public class PBParameterTest {
@Test
public void checkConsumerTopicTest() {
- ParamCheckResult result =
PBParameterUtils.checkConsumerTopicList(null, null);
- Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST);
- final List<String> topicList = new ArrayList<>();
- topicList.add("test1");
- result = PBParameterUtils.checkConsumerTopicList(topicList, new
StringBuilder(128));
- Assert.assertEquals(result.errCode, TErrCodeConstants.SUCCESS);
+ ProcessResult result = new ProcessResult();
+ PBParameterUtils.checkConsumerTopicList(null, null, result, null);
+ Assert.assertEquals(result.getErrCode(),
TErrCodeConstants.BAD_REQUEST);
+ final Set<String> depTopicList = new HashSet<>();
+ final List<String> reqTopicList = new ArrayList<>();
+ depTopicList.add("test1");
+ reqTopicList.add("test1");
+ PBParameterUtils.checkConsumerTopicList(depTopicList,
+ reqTopicList, result, new StringBuilder(128));
+ Assert.assertEquals(result.getErrCode(), TErrCodeConstants.SUCCESS);
+ reqTopicList.add("test2");
+ PBParameterUtils.checkConsumerTopicList(depTopicList,
+ reqTopicList, result, new StringBuilder(128));
+ Assert.assertEquals(result.getErrCode(),
TErrCodeConstants.TOPIC_NOT_DEPLOYED);
for (int i = 0; i < 1025; i++) {
- topicList.add("test" + i);
+ reqTopicList.add("test" + i);
}
- result = PBParameterUtils.checkConsumerTopicList(topicList, new
StringBuilder(128));
- Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST);
+ PBParameterUtils.checkConsumerTopicList(depTopicList,
+ reqTopicList, result, new StringBuilder(128));
+ Assert.assertEquals(result.getErrCode(),
TErrCodeConstants.BAD_REQUEST);
}
@Test