This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 55ae3f850e [INLONG-9222][Manager] SortStandalone configuration support
multiple tags (#9637)
55ae3f850e is described below
commit 55ae3f850e8d1f57d6806d262d0a59c7a43df8b9
Author: vernedeng <[email protected]>
AuthorDate: Wed Jan 31 21:10:09 2024 +0800
[INLONG-9222][Manager] SortStandalone configuration support multiple tags
(#9637)
---
.../sort/standalone/SortSourceClusterInfo.java | 28 ++++++++++++++++++++++
.../service/core/impl/SortSourceServiceImpl.java | 22 +++++++++++------
2 files changed, 43 insertions(+), 7 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceClusterInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceClusterInfo.java
index 59e7d35386..8e85fd8287 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceClusterInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceClusterInfo.java
@@ -17,7 +17,10 @@
package org.apache.inlong.manager.pojo.sort.standalone;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+
import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableSet;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
@@ -25,9 +28,12 @@ import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Data
@@ -46,9 +52,17 @@ public class SortSourceClusterInfo {
String clusterTags;
String extTag;
String extParams;
+ Set<String> clusterTagsSet;
Map<String, String> extTagMap = new ConcurrentHashMap<>();
Map<String, String> extParamsMap = new ConcurrentHashMap<>();
+ public Set<String> getClusterTagsSet() {
+ if (CollectionUtils.isEmpty(clusterTagsSet) &&
StringUtils.isNotBlank(clusterTags)) {
+ clusterTagsSet =
ImmutableSet.copyOf(clusterTags.split(InlongConstants.COMMA));
+ }
+ return clusterTagsSet;
+ }
+
public Map<String, String> getExtParamsMap() {
if (extParamsMap.isEmpty() && extParams != null) {
try {
@@ -84,4 +98,18 @@ public class SortSourceClusterInfo {
String isConsumable = this.getExtTagMap().get(KEY_IS_CONSUMABLE);
return isConsumable == null || "true".equalsIgnoreCase(isConsumable);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof SortSourceClusterInfo)) {
+ return false;
+ }
+ SortSourceClusterInfo other = (SortSourceClusterInfo) o;
+ return Objects.equals(this.name, other.name)
+ && Objects.equals(this.clusterTags, other.clusterTags)
+ && Objects.equals(this.type, other.type)
+ && Objects.equals(this.extParams, other.extParams)
+ && Objects.equals(this.extTag, other.extTag);
+
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index cafe428f37..43e07f985c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -37,6 +37,7 @@ import
org.apache.inlong.manager.service.core.SortSourceService;
import com.google.gson.Gson;
import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -197,10 +198,17 @@ public class SortSourceServiceImpl implements
SortSourceService {
.collect(Collectors.toMap(SortSourceClusterInfo::getName, v ->
v));
// group mq clusters by cluster tag
- mqClusters = allClusters.stream()
+ mqClusters = new HashMap<>();
+ allClusters.stream()
.filter(cluster ->
SUPPORTED_MQ_TYPE.contains(cluster.getType()))
.filter(SortSourceClusterInfo::isConsumable)
-
.collect(Collectors.groupingBy(SortSourceClusterInfo::getClusterTags));
+ .forEach(mq -> {
+ Set<String> tags = mq.getClusterTagsSet();
+ tags.forEach(tag -> {
+ List<SortSourceClusterInfo> list =
mqClusters.computeIfAbsent(tag, k -> new ArrayList<>());
+ list.add(mq);
+ });
+ });
// reload all stream sinks, to Map<clusterName, Map<taskName,
List<groupId>>> format
List<SortSourceStreamSinkInfo> allStreamSinks =
configLoader.loadAllStreamSinks();
@@ -302,7 +310,7 @@ public class SortSourceServiceImpl implements
SortSourceService {
List<SortSourceStreamSinkInfo> sinkList) {
Preconditions.expectNotNull(sortClusters.get(clusterName), "sort
cluster should not be NULL");
- String sortClusterTag = sortClusters.get(clusterName).getClusterTags();
+ Set<String> tags = sortClusters.get(clusterName).getClusterTagsSet();
// get group infos
List<SortSourceStreamSinkInfo> sinkInfoList = sinkList.stream()
@@ -315,10 +323,10 @@ public class SortSourceServiceImpl implements
SortSourceService {
Map<String, List<SortSourceStreamSinkInfo>> tag2SinkInfos =
sinkInfoList.stream()
.filter(sink ->
Objects.nonNull(groupInfos.get(sink.getGroupId())))
.filter(sink -> {
- if (StringUtils.isBlank(sortClusterTag)) {
+ if (CollectionUtils.isEmpty(tags)) {
return true;
}
- return
sortClusterTag.equals(groupInfos.get(sink.getGroupId()).getClusterTag());
+ return
tags.contains(groupInfos.get(sink.getGroupId()).getClusterTag());
})
.collect(Collectors.groupingBy(sink -> {
SortSourceGroupInfo groupInfo =
groupInfos.get(sink.getGroupId());
@@ -329,10 +337,10 @@ public class SortSourceServiceImpl implements
SortSourceService {
Map<String, List<SortSourceStreamSinkInfo>> backupTag2SinkInfos =
sinkInfoList.stream()
.filter(sink ->
backupClusterTag.containsKey(sink.getGroupId()))
.filter(sink -> {
- if (StringUtils.isBlank(sortClusterTag)) {
+ if (CollectionUtils.isEmpty(tags)) {
return true;
}
- return
sortClusterTag.equals(backupClusterTag.get(sink.getGroupId()));
+ return
tags.contains(backupClusterTag.get(sink.getGroupId()));
})
.collect(Collectors.groupingBy(info ->
backupClusterTag.get(info.getGroupId())));