This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit b497de8a583f61faa991195b7aee263b3c4de900 Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Tue Jun 2 10:53:27 2020 +0800 KYLIN-4355 Add validation for cube assignment --- .../kylin/rest/service/StreamingV2Service.java | 24 ++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java index 0141e01..3c88b79 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java @@ -21,6 +21,7 @@ package org.apache.kylin.rest.service; import java.io.IOException; import java.net.InetAddress; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -30,6 +31,8 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -75,7 +78,7 @@ import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.apache.kylin.shaded.com.google.common.collect.Sets; /** - * StreamingCoordinatorService will try to forward request to corrdinator leader by HttpClient. + * StreamingCoordinatorService will try to forward request to coordinator leader. */ @Component("streamingServiceV2") public class StreamingV2Service extends BasicService { @@ -227,11 +230,28 @@ public class StreamingV2Service extends BasicService { private void validateAssignment(CubeAssignment newAssignment) { Map<Integer, List<Partition>> assignments = newAssignment.getAssignments(); + Map<Integer, Set<Partition>> assignmentSet = assignments.keySet().stream().collect( + Collectors.toMap(Function.identity(), HashSet::new)); + Set<Integer> inputReplicaSetIDs = assignments.keySet(); Set<Integer> allReplicaSetIDs = Sets.newHashSet(streamMetadataStore.getReplicaSetIDs()); for (Integer inputReplicaSetID : inputReplicaSetIDs) { if (!allReplicaSetIDs.contains(inputReplicaSetID)) { - throw new IllegalArgumentException("the replica set id:" + inputReplicaSetID + " does not exist"); + throw new IllegalArgumentException("The replica set id:" + inputReplicaSetID + " does not exist"); + } + + Set<Partition> partitionSet = assignmentSet.get(inputReplicaSetID); + if (partitionSet.isEmpty()) { + throw new IllegalArgumentException("PartitionList is empty :" + inputReplicaSetID); + } + for (Map.Entry<Integer, Set<Partition>> entry : assignmentSet.entrySet()) { + if (!entry.getKey().equals(inputReplicaSetID)) { + Set<Partition> anotherPartitionSet = entry.getValue(); + int intersection = Sets.intersection(anotherPartitionSet, partitionSet).size(); + if (intersection > 0) { + throw new IllegalArgumentException("Intersection detected between : " + inputReplicaSetID + " with " + entry.getKey()); + } + } } } }