This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new faa15ad KYLIN-4355 add ut faa15ad is described below commit faa15ad8b13f6ccf9161b6bb59af84f29b9bf958 Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Sat Jun 13 22:20:19 2020 +0800 KYLIN-4355 add ut --- .../org/apache/kylin/common/KylinConfigBase.java | 7 ++ .../kylin/rest/controller/AdminController.java | 12 +++- .../kylin/rest/service/StreamingV2Service.java | 15 ++-- .../kylin/rest/service/StreamingV2ServiceTest.java | 84 ++++++++++++++++++++++ .../kylin/stream/coordinator/CoordinatorTest.java | 2 +- .../kylin/stream/server/StreamingServer.java | 2 +- 6 files changed, 113 insertions(+), 9 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index dd7b0ad..9e17de6 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2225,6 +2225,13 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.web.set-config-enable", FALSE)); } + /** + * @see #isWebConfigEnabled + */ + public String getPropertiesWhiteListForModification() { + return getOptional("kylin.web.properties.whitelist", "kylin.query.cache-enabled"); + } + public String getPropertiesWhiteList() { return getOptional("kylin.web.properties.whitelist", "kylin.web.timezone,kylin.query.cache-enabled,kylin.env," + "kylin.web.hive-limit,kylin.storage.default," diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java index 2529c93..4d90db8 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java @@ -20,6 +20,9 @@ package org.apache.kylin.rest.controller; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import org.apache.commons.configuration.ConfigurationException; import org.apache.kylin.common.KylinConfig; @@ -50,6 +53,8 @@ import org.springframework.web.bind.annotation.ResponseBody; @RequestMapping(value = "/admin") public class AdminController extends BasicController { + private Set<String> propertiesWhiteList = new HashSet<>(); + @Autowired @Qualifier("adminService") private AdminService adminService; @@ -119,9 +124,12 @@ public class AdminController extends BasicController { adminService.cleanupStorage(); } - @RequestMapping(value = "/config", method = { RequestMethod.PUT }, produces = { "application/json" }) + @RequestMapping(value = "/config", method = {RequestMethod.PUT}, produces = {"application/json"}) public void updateKylinConfig(@RequestBody UpdateConfigRequest updateConfigRequest) { - if (!adminService.configWritableStatus()) { + if (propertiesWhiteList.isEmpty()) { + propertiesWhiteList.addAll(Arrays.asList(KylinConfig.getInstanceFromEnv().getPropertiesWhiteListForModification().split(","))); + } + if (!adminService.configWritableStatus() && !propertiesWhiteList.contains(updateConfigRequest.getKey())) { throw new BadRequestException("Update configuration from API is not allowed."); } adminService.updateConfig(updateConfigRequest.getKey(), updateConfigRequest.getValue()); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java index 3c88b79..c43d625 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java @@ -6,15 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.rest.service; @@ -100,6 +100,11 @@ public class StreamingV2Service extends BasicService { receiverAdminClient = new HttpReceiverAdminClient(); } + StreamingV2Service(StreamMetadataStore metadataStore, ReceiverAdminClient adminClient) { + streamMetadataStore = metadataStore; + receiverAdminClient = adminClient; + } + public List<StreamingSourceConfig> listAllStreamingConfigs(final String table) throws IOException { List<StreamingSourceConfig> streamingSourceConfigs = Lists.newArrayList(); if (StringUtils.isEmpty(table)) { @@ -228,10 +233,10 @@ public class StreamingV2Service extends BasicService { getCoordinatorClient().reAssignCube(cubeName, newAssignment); } - private void validateAssignment(CubeAssignment newAssignment) { + void validateAssignment(CubeAssignment newAssignment) { Map<Integer, List<Partition>> assignments = newAssignment.getAssignments(); Map<Integer, Set<Partition>> assignmentSet = assignments.keySet().stream().collect( - Collectors.toMap(Function.identity(), HashSet::new)); + Collectors.toMap(Function.identity(), x -> new HashSet<>(assignments.get(x)))); Set<Integer> inputReplicaSetIDs = assignments.keySet(); Set<Integer> allReplicaSetIDs = Sets.newHashSet(streamMetadataStore.getReplicaSetIDs()); diff --git a/server-base/src/test/java/org/apache/kylin/rest/service/StreamingV2ServiceTest.java b/server-base/src/test/java/org/apache/kylin/rest/service/StreamingV2ServiceTest.java new file mode 100644 index 0000000..e3de205 --- /dev/null +++ b/server-base/src/test/java/org/apache/kylin/rest/service/StreamingV2ServiceTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.service; + +import org.apache.kylin.stream.coordinator.StreamMetadataStore; +import org.apache.kylin.stream.core.model.CubeAssignment; +import org.apache.kylin.stream.core.source.Partition; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; + +public class StreamingV2ServiceTest { + + @Test + public void testValidateAssignment() { + Map<Integer, List<Partition>> assignmentMap = new HashMap<>(); + Partition p1 = new Partition(1); + Partition p2 = new Partition(2); + Partition p3 = new Partition(3); + List<Integer> replicaSets = new ArrayList<>(); + replicaSets.add(0); + replicaSets.add(1); + List<Partition> l1 = new ArrayList<>(); + l1.add(p1); + l1.add(p2); + List<Partition> l2 = new ArrayList<>(); + l2.add(p3); + assignmentMap.put(0, l1); + assignmentMap.put(1, l2); + + CubeAssignment cubeAssignment = new CubeAssignment("test", assignmentMap); + StreamMetadataStore metadataStore = mock(StreamMetadataStore.class); + when(metadataStore.getReplicaSetIDs()).thenReturn(replicaSets); + StreamingV2Service streamingV2Service = new StreamingV2Service(metadataStore, null); + Exception exception = null; + + // normal case + streamingV2Service.validateAssignment(cubeAssignment); + + // bad case 1 + l2.add(p2); + try { + streamingV2Service.validateAssignment(cubeAssignment); + } catch (IllegalArgumentException ill) { + exception = ill; + ill.printStackTrace(); + } + Assert.assertNotNull("Intersection detected between : 0 with 1", exception); + + // bad case 2 + l2.clear(); + exception = null; + try { + streamingV2Service.validateAssignment(cubeAssignment); + } catch (IllegalArgumentException ill) { + exception = ill; + ill.printStackTrace(); + } + Assert.assertNotNull("PartitionList is empty", exception); + } +} diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java index e6cdf80..f1e1644 100644 --- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java +++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java @@ -207,7 +207,7 @@ public class CoordinatorTest extends LocalFileMetadataTestCase { } @Test - public void testReassignWithoutExeception() throws IOException { + public void testReassignWithoutException() throws IOException { ReceiverAdminClient receiverAdminClient = mockSuccessReceiverAdminClient(); coordinator = new Coordinator(metadataStore, receiverAdminClient); diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java index a948b9e..7ace4ec 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java @@ -558,7 +558,7 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis if (latestRemoteSegment != null) { minAcceptEventTime = latestRemoteSegment.getTSRange().end.v; } - if (minAcceptEventTime > 0) { + if (minAcceptEventTime > 0 && minAcceptEventTime < System.currentTimeMillis()) { consumer.setMinAcceptEventTime(minAcceptEventTime); } StreamingCubeConsumeState consumeState = streamMetadataStore.getStreamingCubeConsumeState(cubeName);