This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3fa0cd9b8 [INLONG-5088][Manager] Support only consumes the MQ cluster
with the same tag (#5090)
3fa0cd9b8 is described below
commit 3fa0cd9b8fe8284a58f6fb3fef59132f91d7c23a
Author: vernedeng <[email protected]>
AuthorDate: Mon Jul 18 14:20:23 2022 +0800
[INLONG-5088][Manager] Support only consumes the MQ cluster with the same
tag (#5090)
---
.../service/core/impl/SortSourceServiceImpl.java | 32 +++++++++++++++++-----
1 file changed, 25 insertions(+), 7 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index 4475910aa..123cc47ad 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -202,25 +202,43 @@ public class SortSourceServiceImpl implements
SortSourceService {
.filter(cluster ->
SUPPORTED_MQ_TYPE.contains(cluster.getType()))
.collect(Collectors.groupingBy(SortSourceClusterInfo::getClusterTags));
+ // group clusters by name.
+ Map<String, SortSourceClusterInfo> name2ClusterInfos =
clusterInfos.stream()
+ .collect(Collectors.toMap(SortSourceClusterInfo::getName, info
-> info, (g1, g2) -> g1));
+
// Prepare CacheZones for each cluster and task
Map<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<>();
Map<String, Map<String, CacheZoneConfig>> newConfigMap = new
ConcurrentHashMap<>();
- groupMap.forEach((cluster, task2Group) -> {
-
+ groupMap.forEach((clusterName, task2Group) -> {
+
+ // if there is no matched cluster name, just skip
+ if (!name2ClusterInfos.containsKey(clusterName)) {
+ return;
+ }
+ // find valid mq cluster list
+ String clusterTag =
name2ClusterInfos.get(clusterName).getClusterTags();
+ final Map<String, List<SortSourceClusterInfo>> validClusterInfos =
new ConcurrentHashMap<>();
+ if (allTag2ClusterInfos.containsKey(clusterTag)) {
+ validClusterInfos.put(clusterTag,
allTag2ClusterInfos.get(clusterTag));
+ } else {
+ validClusterInfos.putAll(allTag2ClusterInfos);
+ }
+
+ // prepare the new config and md5
Map<String, CacheZoneConfig> task2Config = new
ConcurrentHashMap<>();
Map<String, String> task2Md5 = new ConcurrentHashMap<>();
task2Group.forEach((task, groupList) -> {
Map<String, CacheZone> cacheZones;
try {
- cacheZones = this.getCacheZones(groupList,
allId2GroupInfos, allTag2ClusterInfos);
+ cacheZones = this.getCacheZones(groupList,
allId2GroupInfos, validClusterInfos);
} catch (Throwable t) {
- LOGGER.error("fail to get cacheZones of cluster {}, task
{}", cluster, task);
+ LOGGER.error("fail to get cacheZones of clusterName {},
task {}", clusterName, task);
return;
}
CacheZoneConfig config = CacheZoneConfig.builder()
.cacheZones(cacheZones)
- .sortClusterName(cluster)
+ .sortClusterName(clusterName)
.sortTaskId(task)
.build();
String jsonStr = GSON.toJson(config);
@@ -229,8 +247,8 @@ public class SortSourceServiceImpl implements
SortSourceService {
task2Md5.put(task, md5);
});
- newConfigMap.put(cluster, task2Config);
- newMd5Map.put(cluster, task2Md5);
+ newConfigMap.put(clusterName, task2Config);
+ newMd5Map.put(clusterName, task2Md5);
});
sortSourceConfigMap = newConfigMap;