KYLIN-1311 on the way
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4f41fd5c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4f41fd5c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4f41fd5c Branch: refs/heads/helix-201602 Commit: 4f41fd5c80351d31e83330d070d5dbd8c11c4ea5 Parents: bbfe8ae Author: shaofengshi <[email protected]> Authored: Fri Jan 22 11:01:48 2016 +0800 Committer: shaofengshi <[email protected]> Committed: Sat Feb 6 13:33:06 2016 +0800 ---------------------------------------------------------------------- build/bin/kylin.sh | 8 +- .../test_case_data/localmeta/kylin.properties | 2 +- server/pom.xml | 1 + .../rest/controller/ClusterController.java | 71 ++ .../kylin/rest/controller/JobController.java | 33 - .../rest/controller/StreamingController.java | 68 +- .../kylin/rest/helix/HelixClusterAdmin.java | 31 +- .../rest/helix/JobEngineTransitionHandler.java | 70 ++ .../helix/LeaderStandbyStateModelFactory.java | 125 +--- .../helix/StreamCubeBuildTransitionHandler.java | 107 +++ .../rest/request/StreamingBuildRequest.java | 29 +- .../security/KylinAuthenticationProvider.java | 3 +- .../kylin/rest/service/StreamingService.java | 34 +- .../rest/controller/UserControllerTest.java | 9 - .../kylin/rest/helix/HelixClusterAdminTest.java | 22 +- .../kylin/rest/service/CacheServiceTest.java | 720 +++++++++---------- .../kylin/rest/service/ServiceTestBase.java | 40 +- .../rest/service/TestBaseWithZookeeper.java | 74 ++ .../source/kafka/TimedJsonStreamParser.java | 7 +- 19 files changed, 825 insertions(+), 629 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/build/bin/kylin.sh ---------------------------------------------------------------------- diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh index 5b03f43..d196fe6 100644 --- a/build/bin/kylin.sh +++ b/build/bin/kylin.sh @@ -70,7 +70,7 @@ then if [ -z "$KYLIN_REST_ADDRESS" ] then - kylin_rest_address=`hostname`":"`grep "<Connector port=" ${tomcat_root}/conf/server.xml |grep protocol=\"HTTP/1.1\" | cut -d '=' -f 2 | cut -d \" -f 2` + kylin_rest_address=`hostname -f`":"`grep "<Connector port=" ${tomcat_root}/conf/server.xml |grep protocol=\"HTTP/1.1\" | cut -d '=' -f 2 | cut -d \" -f 2` echo "KYLIN_REST_ADDRESS not found, will use ${kylin_rest_address}" else echo "KYLIN_REST_ADDRESS is set to: $KYLIN_REST_ADDRESS" @@ -154,12 +154,12 @@ then exit 0 elif [ "$2" == "stop" ] then - if [ ! -f "${KYLIN_HOME}/$3_$4" ] + if [ ! -f "${KYLIN_HOME}/logs/$3_$4" ] then echo "streaming is not running, please check" exit 1 fi - pid=`cat ${KYLIN_HOME}/$3_$4` + pid=`cat ${KYLIN_HOME}/logs/$3_$4` if [ "$pid" = "" ] then echo "streaming is not running, please check" @@ -168,7 +168,7 @@ then echo "stopping streaming:$pid" kill $pid fi - rm ${KYLIN_HOME}/$3_$4 + rm ${KYLIN_HOME}/logs/$3_$4 exit 0 else echo http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/examples/test_case_data/localmeta/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties index 978102f..41a9895 100644 --- a/examples/test_case_data/localmeta/kylin.properties +++ b/examples/test_case_data/localmeta/kylin.properties @@ -6,7 +6,7 @@ [email protected] # List of web servers in use, this enables one web server instance to sync up with other servers. -#kylin.rest.servers=localhost:7070 +kylin.rest.servers=localhost:7070 # The metadata store in hbase kylin.metadata.url= http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index 86ec5a5..2359855 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -466,6 +466,7 @@ <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zookeeper.version}</version> + <scope>provided</scope> <exclusions> <exclusion> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java b/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java new file mode 100644 index 0000000..97fff36 --- /dev/null +++ b/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.controller; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.constant.JobStatusEnum; +import org.apache.kylin.job.constant.JobTimeFilterEnum; +import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.rest.helix.HelixClusterAdmin; +import org.apache.kylin.rest.request.JobListRequest; +import org.apache.kylin.rest.service.JobService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.ResponseBody; + +import java.util.*; + +/** + * + */ +@Controller +@RequestMapping(value = "cluster") +public class ClusterController extends BasicController implements InitializingBean { + private static final Logger logger = LoggerFactory.getLogger(ClusterController.class); + + /* + * (non-Javadoc) + * + * @see + * org.springframework.beans.factory.InitializingBean#afterPropertiesSet() + */ + @Override + public void afterPropertiesSet() throws Exception { + + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + + final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig); + clusterAdmin.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + clusterAdmin.stop(); + } + })); + + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/controller/JobController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java index 77d987f..a61635d 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java @@ -61,42 +61,9 @@ public class JobController extends BasicController implements InitializingBean { */ @Override public void afterPropertiesSet() throws Exception { - String timeZone = jobService.getConfig().getTimeZone(); TimeZone tzone = TimeZone.getTimeZone(timeZone); TimeZone.setDefault(tzone); - - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - - if (kylinConfig.isClusterEnabled() == true) { - logger.info("Kylin cluster enabled, will use Helix/zookeeper to coordinate."); - final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig); - clusterAdmin.start(); - - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - clusterAdmin.stop(); - } - })); - } else { - new Thread(new Runnable() { - @Override - public void run() { - try { - DefaultScheduler scheduler = DefaultScheduler.createInstance(); - scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); - if (!scheduler.hasStarted()) { - logger.error("scheduler has not been started"); - System.exit(1); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }).start(); - } - } /** http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java index fb806d1..209c552 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java @@ -26,11 +26,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.CubeBuildTypeEnum; -import org.apache.kylin.engine.streaming.BootstrapConfig; import org.apache.kylin.engine.streaming.StreamingConfig; -import org.apache.kylin.job.JobInstance; -import org.apache.kylin.job.exception.JobException; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.exception.ForbiddenException; import org.apache.kylin.rest.exception.InternalErrorException; @@ -45,7 +41,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.access.AccessDeniedException; -import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.*; @@ -93,7 +88,6 @@ public class StreamingController extends BasicController { } } - /** * * create Streaming Schema @@ -105,7 +99,7 @@ public class StreamingController extends BasicController { //Update Model StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest); KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest); - if (streamingConfig == null ||kafkaConfig == null) { + if (streamingConfig == null || kafkaConfig == null) { return streamingRequest; } if (StringUtils.isEmpty(streamingConfig.getName())) { @@ -124,7 +118,7 @@ public class StreamingController extends BasicController { try { kafkaConfig.setUuid(UUID.randomUUID().toString()); kafkaConfigService.createKafkaConfig(kafkaConfig); - }catch (IOException e){ + } catch (IOException e) { try { streamingService.dropStreamingConfig(streamingConfig); } catch (IOException e1) { @@ -139,7 +133,7 @@ public class StreamingController extends BasicController { @RequestMapping(value = "", method = { RequestMethod.PUT }) @ResponseBody - public StreamingRequest updateModelDesc(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException { + public StreamingRequest updateModelDesc(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException { StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest); KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest); @@ -156,7 +150,7 @@ public class StreamingController extends BasicController { } try { kafkaConfig = kafkaConfigService.updateKafkaConfig(kafkaConfig); - }catch (AccessDeniedException accessDeniedException) { + } catch (AccessDeniedException accessDeniedException) { throw new ForbiddenException("You don't have right to update this KafkaConfig."); } catch (Exception e) { logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e); @@ -203,7 +197,6 @@ public class StreamingController extends BasicController { return desc; } - private KafkaConfig deserializeKafkaSchemalDesc(StreamingRequest streamingRequest) { KafkaConfig desc = null; try { @@ -227,16 +220,14 @@ public class StreamingController extends BasicController { request.setMessage(message); } - - /** * Send a stream build request * - * @param cubeName Cube ID + * @param cubeName Cube Name * @return * @throws IOException */ - @RequestMapping(value = "/{cubeName}/build", method = {RequestMethod.PUT}) + @RequestMapping(value = "/{cubeName}/build", method = { RequestMethod.PUT }) @ResponseBody public StreamingBuildRequest buildStream(@PathVariable String cubeName, @RequestBody StreamingBuildRequest streamingBuildRequest) { StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName); @@ -244,27 +235,54 @@ public class StreamingController extends BasicController { List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null); Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found."); CubeInstance cube = cubes.get(0); - if (streamingBuildRequest.isFillGap() == false) { - Preconditions.checkArgument(streamingBuildRequest.getEnd() > streamingBuildRequest.getStart(), "End time should be greater than start time."); - for (CubeSegment segment : cube.getSegments()) { - if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) { - streamingBuildRequest.setMessage("The segment already exists: " + segment.toString()); - streamingBuildRequest.setSuccessful(false); - return streamingBuildRequest; - } + if (streamingBuildRequest.getEnd() <= streamingBuildRequest.getStart()) { + streamingBuildRequest.setMessage("End time should be greater than start time.");streamingBuildRequest.setSuccessful(false); + return streamingBuildRequest; + } + + for (CubeSegment segment : cube.getSegments()) { + if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) { + streamingBuildRequest.setMessage("The segment already exists: " + segment.toString()); + streamingBuildRequest.setSuccessful(false); + return streamingBuildRequest; } } streamingBuildRequest.setStreaming(streamingConfig.getName()); - streamingService.buildStream(cubeName, streamingBuildRequest); + streamingService.buildStream(cube, streamingBuildRequest); streamingBuildRequest.setMessage("Build request is submitted successfully."); streamingBuildRequest.setSuccessful(true); return streamingBuildRequest; } + /** + * Send a stream fillGap request + * + * @param cubeName Cube Name + * @return + * @throws IOException + */ + @RequestMapping(value = "/{cubeName}/fillgap", method = { RequestMethod.PUT }) + @ResponseBody + public StreamingBuildRequest fillGap(@PathVariable String cubeName) { + StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName); + Preconditions.checkNotNull(streamingConfig, "Stream config for '" + cubeName + "' is not found."); + List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null); + Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found."); + CubeInstance cube = cubes.get(0); + + StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest(); + streamingBuildRequest.setStreaming(streamingConfig.getName()); + streamingService.fillGap(cube); + streamingBuildRequest.setMessage("FillGap request is submitted successfully."); + streamingBuildRequest.setSuccessful(true); + return streamingBuildRequest; + + } + public void setStreamingService(StreamingService streamingService) { - this.streamingService= streamingService; + this.streamingService = streamingService; } public void setKafkaConfigService(KafkaConfigService kafkaConfigService) { http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java index 9850e24..0758ef1 100644 --- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java +++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java @@ -33,8 +33,10 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.request.StreamingBuildRequest; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,14 +128,13 @@ public class HelixClusterAdmin { } - public void addStreamingJob(String streamingName, long start, long end) { - String resourceName = RESOURCE_STREAME_CUBE_PREFIX + streamingName + "_" + start + "_" + end; - if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) { - admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name()); - } else { - logger.warn("Resource '" + resourceName + "' already exists in cluster, skip adding."); + public void addStreamingJob(StreamingBuildRequest streamingBuildRequest) { + String resourceName = streamingBuildRequest.toResourceName(); + if (admin.getResourcesInCluster(clusterName).contains(resourceName)) { + logger.warn("Resource '" + resourceName + "' already exists in cluster, remove and re-add."); + admin.dropResource(clusterName, resourceName); } - + admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name()); admin.rebalance(clusterName, resourceName, 2, "", TAG_STREAM_BUILDER); } @@ -150,7 +151,7 @@ public class HelixClusterAdmin { */ protected void startInstance(String instanceName) throws Exception { participantManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddress); - participantManager.getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(MODEL_LEADER_STANDBY), new LeaderStandbyStateModelFactory()); + participantManager.getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(MODEL_LEADER_STANDBY), new LeaderStandbyStateModelFactory(this.kylinConfig)); participantManager.connect(); participantManager.addLiveInstanceChangeListener(new KylinClusterLiveInstanceChangeListener()); @@ -179,10 +180,12 @@ public class HelixClusterAdmin { public void stop() { if (participantManager != null) { participantManager.disconnect(); + participantManager = null; } if (controllerManager != null) { controllerManager.disconnect(); + controllerManager = null; } } @@ -269,11 +272,13 @@ public class HelixClusterAdmin { int indexOfUnderscore = instanceName.lastIndexOf("_"); instanceRestAddresses.add(instanceName.substring(0, indexOfUnderscore) + ":" + instanceName.substring(indexOfUnderscore + 1)); } - String restServersInCluster = StringUtil.join(instanceRestAddresses, ","); - kylinConfig.setProperty("kylin.rest.servers", restServersInCluster); - System.setProperty("kylin.rest.servers", restServersInCluster); - logger.info("kylin.rest.servers update to " + restServersInCluster); - Broadcaster.clearCache(); + if (instanceRestAddresses.size() > 0) { + String restServersInCluster = StringUtil.join(instanceRestAddresses, ","); + kylinConfig.setProperty("kylin.rest.servers", restServersInCluster); + System.setProperty("kylin.rest.servers", restServersInCluster); + logger.info("kylin.rest.servers update to " + restServersInCluster); + Broadcaster.clearCache(); + } } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java b/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java new file mode 100644 index 0000000..3ef04ee --- /dev/null +++ b/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java @@ -0,0 +1,70 @@ +package org.apache.kylin.rest.helix; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.helix.NotificationContext; +import org.apache.helix.api.TransitionHandler; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.Transition; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.impl.threadpool.DefaultScheduler; +import org.apache.kylin.job.lock.MockJobLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentMap; + +/** + */ +public class JobEngineTransitionHandler extends TransitionHandler { + private static final Logger logger = LoggerFactory.getLogger(JobEngineTransitionHandler.class); + private final KylinConfig kylinConfig; + + private static ConcurrentMap<KylinConfig, JobEngineTransitionHandler> instanceMaps = Maps.newConcurrentMap(); + + private JobEngineTransitionHandler(KylinConfig kylinConfig) { + this.kylinConfig = kylinConfig; + } + + public static JobEngineTransitionHandler getInstance(KylinConfig kylinConfig) { + Preconditions.checkNotNull(kylinConfig); + instanceMaps.putIfAbsent(kylinConfig, new JobEngineTransitionHandler(kylinConfig)); + return instanceMaps.get(kylinConfig); + } + + @Transition(to = "LEADER", from = "STANDBY") + public void onBecomeLeaderFromStandby(Message message, NotificationContext context) { + logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()"); + try { + DefaultScheduler scheduler = DefaultScheduler.createInstance(); + scheduler.init(new JobEngineConfig(this.kylinConfig), new MockJobLock()); + while (!scheduler.hasStarted()) { + logger.error("scheduler has not been started"); + Thread.sleep(1000); + } + } catch (Exception e) { + logger.error("error start DefaultScheduler", e); + throw new RuntimeException(e); + } + } + + @Transition(to = "STANDBY", from = "LEADER") + public void onBecomeStandbyFromLeader(Message message, NotificationContext context) { + logger.info("JobEngineStateModel.onBecomeStandbyFromLeader()"); + DefaultScheduler.destroyInstance(); + + } + + @Transition(to = "STANDBY", from = "OFFLINE") + public void onBecomeStandbyFromOffline(Message message, NotificationContext context) { + logger.info("JobEngineStateModel.onBecomeStandbyFromOffline()"); + + } + + @Transition(to = "OFFLINE", from = "STANDBY") + public void onBecomeOfflineFromStandby(Message message, NotificationContext context) { + logger.info("JobEngineStateModel.onBecomeOfflineFromStandby()"); + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java index 8614e8c..940c9c2 100644 --- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java +++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java @@ -1,146 +1,35 @@ package org.apache.kylin.rest.helix; -import com.google.common.base.Preconditions; -import org.apache.helix.NotificationContext; import org.apache.helix.api.StateTransitionHandlerFactory; import org.apache.helix.api.TransitionHandler; import org.apache.helix.api.id.PartitionId; import org.apache.helix.api.id.ResourceId; -import org.apache.helix.model.Message; -import org.apache.helix.participant.statemachine.Transition; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.KylinConfigBase; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.engine.streaming.StreamingManager; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.impl.threadpool.DefaultScheduler; -import org.apache.kylin.job.lock.MockJobLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - import static org.apache.kylin.rest.helix.HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX; /** */ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> { - private static final Logger logger = LoggerFactory.getLogger(LeaderStandbyStateModelFactory.class); + private final KylinConfig kylinConfig; + + public LeaderStandbyStateModelFactory(KylinConfig kylinConfig) { + this.kylinConfig = kylinConfig; + } @Override public TransitionHandler createStateTransitionHandler(PartitionId partitionId) { if (partitionId.getResourceId().equals(ResourceId.from(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE))) { - return JobEngineStateModel.INSTANCE; + return JobEngineTransitionHandler.getInstance(kylinConfig); } if (partitionId.getResourceId().stringify().startsWith(RESOURCE_STREAME_CUBE_PREFIX)) { - return StreamCubeStateModel.INSTANCE; + return StreamCubeBuildTransitionHandler.getInstance(kylinConfig); } return null; } - public static class JobEngineStateModel extends TransitionHandler { - - public static JobEngineStateModel INSTANCE = new JobEngineStateModel(); - - @Transition(to = "LEADER", from = "STANDBY") - public void onBecomeLeaderFromStandby(Message message, NotificationContext context) { - logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()"); - try { - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - DefaultScheduler scheduler = DefaultScheduler.createInstance(); - scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock()); - while (!scheduler.hasStarted()) { - logger.error("scheduler has not been started"); - Thread.sleep(1000); - } - } catch (Exception e) { - logger.error("error start DefaultScheduler", e); - throw new RuntimeException(e); - } - } - - @Transition(to = "STANDBY", from = "LEADER") - public void onBecomeStandbyFromLeader(Message message, NotificationContext context) { - logger.info("JobEngineStateModel.onBecomeStandbyFromLeader()"); - DefaultScheduler.destroyInstance(); - - } - - @Transition(to = "STANDBY", from = "OFFLINE") - public void onBecomeStandbyFromOffline(Message message, NotificationContext context) { - logger.info("JobEngineStateModel.onBecomeStandbyFromOffline()"); - - } - - @Transition(to = "OFFLINE", from = "STANDBY") - public void onBecomeOfflineFromStandby(Message message, NotificationContext context) { - logger.info("JobEngineStateModel.onBecomeOfflineFromStandby()"); - - } - } - - public static class StreamCubeStateModel extends TransitionHandler { - - public static StreamCubeStateModel INSTANCE = new StreamCubeStateModel(); - - @Transition(to = "LEADER", from = "STANDBY") - public void onBecomeLeaderFromStandby(Message message, NotificationContext context) { - String resourceName = message.getResourceId().stringify(); - Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX)); - long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1)); - String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_")); - long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1)); - String streamingConfig = temp.substring(0, temp.lastIndexOf("_")); - - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - - final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingConfig).getCubeName(); - final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName); - for (CubeSegment segment : cube.getSegments()) { - if (segment.getDateRangeStart() <= start && segment.getDateRangeEnd() >= end) { - logger.info("Segment " + segment.getName() + " already exist, no need rebuild."); - return; - } - } - - KylinConfigBase.getKylinHome(); - String segmentId = start + "_" + end; - String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingConfig + " " + segmentId + " -oneoff true -start " + start + " -end " + end + " -streaming " + streamingConfig; - logger.info("Executing: " + cmd); - try { - String line; - Process p = Runtime.getRuntime().exec(cmd); - BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); - while ((line = input.readLine()) != null) { - logger.info(line); - } - input.close(); - } catch (IOException err) { - logger.error("Error happens during build streaming '" + resourceName + "'", err); - throw new RuntimeException(err); - } - - } - - @Transition(to = "STANDBY", from = "LEADER") - public void onBecomeStandbyFromLeader(Message message, NotificationContext context) { - - } - - @Transition(to = "STANDBY", from = "OFFLINE") - public void onBecomeStandbyFromOffline(Message message, NotificationContext context) { - - } - - @Transition(to = "OFFLINE", from = "STANDBY") - public void onBecomeOfflineFromStandby(Message message, NotificationContext context) { - - } - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java b/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java new file mode 100644 index 0000000..44d8302 --- /dev/null +++ b/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java @@ -0,0 +1,107 @@ +package org.apache.kylin.rest.helix; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.helix.NotificationContext; +import org.apache.helix.api.TransitionHandler; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.Transition; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinConfigBase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.rest.request.StreamingBuildRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.concurrent.ConcurrentMap; + +/** + */ +public class StreamCubeBuildTransitionHandler extends TransitionHandler { + + private static final Logger logger = LoggerFactory.getLogger(StreamCubeBuildTransitionHandler.class); + + private static ConcurrentMap<KylinConfig, StreamCubeBuildTransitionHandler> instanceMaps = Maps.newConcurrentMap(); + private final KylinConfig kylinConfig; + + private StreamCubeBuildTransitionHandler(KylinConfig kylinConfig) { + this.kylinConfig = kylinConfig; + } + + public static StreamCubeBuildTransitionHandler getInstance(KylinConfig kylinConfig) { + Preconditions.checkNotNull(kylinConfig); + instanceMaps.putIfAbsent(kylinConfig, new StreamCubeBuildTransitionHandler(kylinConfig)); + return instanceMaps.get(kylinConfig); + } + + @Transition(to = "LEADER", from = "STANDBY") + public void onBecomeLeaderFromStandby(Message message, NotificationContext context) { + String resourceName = message.getResourceId().stringify(); + StreamingBuildRequest streamingBuildRequest = StreamingBuildRequest.fromResourceName(resourceName); + + final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming()).getCubeName(); + final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName); + for (CubeSegment segment : cube.getSegments()) { + if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) { + logger.info("Segment " + segment.getName() + " already exist, no need rebuild."); + return; + } + } + + KylinConfigBase.getKylinHome(); + String segmentId = streamingBuildRequest.getStart() + "_" + streamingBuildRequest.getEnd(); + String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingBuildRequest.getStreaming() + " " + segmentId + " -oneoff true -start " + streamingBuildRequest.getStart() + " -end " + streamingBuildRequest.getEnd() + " -streaming " + streamingBuildRequest.getStreaming(); + logger.info("Executing: " + cmd); + try { + String line; + Process p = Runtime.getRuntime().exec(cmd); + BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); + while ((line = input.readLine()) != null) { + logger.info(line); + } + input.close(); + } catch (IOException err) { + logger.error("Error happens during build streaming '" + resourceName + "'", err); + throw new RuntimeException(err); + } + + } + + @Transition(to = "STANDBY", from = "LEADER") + public void onBecomeStandbyFromLeader(Message message, NotificationContext context) { + String resourceName = message.getResourceId().stringify(); + StreamingBuildRequest streamingBuildRequest = StreamingBuildRequest.fromResourceName(resourceName); + KylinConfigBase.getKylinHome(); + String segmentId = streamingBuildRequest.getStart() + "_" + streamingBuildRequest.getEnd(); + String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming stop " + streamingBuildRequest.getStreaming() + " " + segmentId; + logger.info("Executing: " + cmd); + try { + String line; + Process p = Runtime.getRuntime().exec(cmd); + BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); + while ((line = input.readLine()) != null) { + logger.info(line); + } + input.close(); + } catch (IOException err) { + logger.error("Error happens during build streaming '" + resourceName + "'", err); + throw new RuntimeException(err); + } + } + + @Transition(to = "STANDBY", from = "OFFLINE") + public void onBecomeStandbyFromOffline(Message message, NotificationContext context) { + + } + + @Transition(to = "OFFLINE", from = "STANDBY") + public void onBecomeOfflineFromStandby(Message message, NotificationContext context) { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java index e06a06c..dcf91fd 100644 --- a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java +++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java @@ -18,15 +18,28 @@ package org.apache.kylin.rest.request; +import com.google.common.base.Preconditions; +import org.apache.kylin.rest.helix.HelixClusterAdmin; + +import static org.apache.kylin.rest.helix.HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX; + public class StreamingBuildRequest { private String streaming; private long start; private long end; - private boolean fillGap; private String message; private boolean successful; + public StreamingBuildRequest() { + } + + public StreamingBuildRequest(String streaming, long start, long end) { + this.streaming = streaming; + this.start = start; + this.end = end; + } + public String getStreaming() { return streaming; } @@ -67,11 +80,17 @@ public class StreamingBuildRequest { this.end = end; } - public boolean isFillGap() { - return fillGap; + public String toResourceName() { + return HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX + streaming + "_" + start + "_" + end; } - public void setFillGap(boolean fillGap) { - this.fillGap = fillGap; + public static StreamingBuildRequest fromResourceName(String resourceName) { + Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX)); + long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1)); + String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_")); + long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1)); + String streamingConfig = temp.substring(0, temp.lastIndexOf("_")); + + return new StreamingBuildRequest(streamingConfig, start, end); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java b/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java index 1f147ef..b8dcd43 100644 --- a/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java +++ b/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java @@ -72,7 +72,8 @@ public class KylinAuthenticationProvider implements AuthenticationProvider { } logger.debug("Authenticated user " + authed.toString()); - + + SecurityContextHolder.getContext().setAuthentication(authed); UserDetails user; if (authed.getDetails() == null) { http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java index da20949..7c2cc48 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java @@ -21,7 +21,6 @@ package org.apache.kylin.rest.service; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.engine.streaming.BootstrapConfig; import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.engine.streaming.StreamingManager; import org.apache.kylin.engine.streaming.monitor.StreamingMonitor; @@ -54,8 +53,8 @@ public class StreamingService extends BasicService { if (null == cubeInstance) { streamingConfigs = getStreamingManager().listAllStreaming(); } else { - for(StreamingConfig config : getStreamingManager().listAllStreaming()){ - if(cubeInstance.getName().equals(config.getCubeName())){ + for (StreamingConfig config : getStreamingManager().listAllStreaming()) { + if (cubeInstance.getName().equals(config.getCubeName())) { streamingConfigs.add(config); } } @@ -84,34 +83,35 @@ public class StreamingService extends BasicService { if (getStreamingManager().getStreamingConfig(config.getName()) != null) { throw new InternalErrorException("The streamingConfig named " + config.getName() + " already exists"); } - StreamingConfig streamingConfig = getStreamingManager().saveStreamingConfig(config); + StreamingConfig streamingConfig = getStreamingManager().saveStreamingConfig(config); return streamingConfig; } -// @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')") + // @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')") public StreamingConfig updateStreamingConfig(StreamingConfig config) throws IOException { return getStreamingManager().updateStreamingConfig(config); } -// @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')") + // @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')") public void dropStreamingConfig(StreamingConfig config) throws IOException { getStreamingManager().removeStreamingConfig(config); } + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") + public void buildStream(CubeInstance cube, StreamingBuildRequest streamingBuildRequest) { + HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv()); + clusterAdmin.addStreamingJob(streamingBuildRequest); + } @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") - public void buildStream(String cube, StreamingBuildRequest streamingBuildRequest) { + public void fillGap(CubeInstance cube) { HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv()); - if (streamingBuildRequest.isFillGap()) { - final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingBuildRequest.getStreaming()); - final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName()); - logger.info("all gaps:" + org.apache.commons.lang3.StringUtils.join(gaps, ",")); - for (Pair<Long, Long> gap : gaps) { - clusterAdmin.addStreamingJob(streamingBuildRequest.getStreaming(), gap.getFirst(), gap.getSecond()); - } - } else { - clusterAdmin.addStreamingJob(streamingBuildRequest.getStreaming(), streamingBuildRequest.getStart(), streamingBuildRequest.getEnd()); + final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfigByCube(cube.getName()); + final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName()); + logger.info("all gaps:" + org.apache.commons.lang3.StringUtils.join(gaps, ",")); + for (Pair<Long, Long> gap : gaps) { + StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest(streamingConfig.getName(), gap.getFirst(), gap.getSecond()); + clusterAdmin.addStreamingJob(streamingBuildRequest); } } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java index ab77a9a..fe0e67a 100644 --- a/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java +++ b/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java @@ -41,15 +41,6 @@ public class UserControllerTest extends ServiceTestBase { private UserController userController; - @BeforeClass - public static void setupResource() { - staticCreateTestMetadata(); - List<GrantedAuthority> authorities = new ArrayList<GrantedAuthority>(); - User user = new User("ADMIN", "ADMIN", authorities); - Authentication authentication = new TestingAuthenticationToken(user, "ADMIN", "ROLE_ADMIN"); - SecurityContextHolder.getContext().setAuthentication(authentication); - } - @Before public void setup() throws Exception { super.setup(); http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java b/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java index 594e76b5..1c8b779 100644 --- a/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java +++ b/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.rest.service.TestBaseWithZookeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -39,10 +40,7 @@ import static org.junit.Assert.assertTrue; /** */ -public class HelixClusterAdminTest extends LocalFileMetadataTestCase { - - String zkAddress = "localhost:2199"; - ZkServer server; +public class HelixClusterAdminTest extends TestBaseWithZookeeper { HelixClusterAdmin clusterAdmin1; HelixClusterAdmin clusterAdmin2; @@ -52,21 +50,8 @@ public class HelixClusterAdminTest extends LocalFileMetadataTestCase { @Before public void setup() throws Exception { - createTestMetadata(); - // start zookeeper on localhost - final File tmpDir = File.createTempFile("HelixClusterAdminTest", null); - FileUtil.fullyDelete(tmpDir); - tmpDir.mkdirs(); - server = new ZkServer(tmpDir.getAbsolutePath() + "/dataDir", tmpDir.getAbsolutePath() + "/logDir", new IDefaultNameSpace() { - @Override - public void createDefaultNameSpace(ZkClient zkClient) { - } - }, 2199); - server.start(); - kylinConfig = this.getTestConfig(); kylinConfig.setRestAddress("localhost:7070"); - kylinConfig.setZookeeperAddress(zkAddress); kylinConfig.setClusterName(CLUSTER_NAME); final ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkAddress); @@ -105,7 +90,7 @@ public class HelixClusterAdminTest extends LocalFileMetadataTestCase { // 3. shutdown the first instance clusterAdmin1.stop(); - clusterAdmin1 = null; +// clusterAdmin1 = null; Thread.sleep(1000); assertTrue(clusterAdmin2.isLeaderRole(RESOURCE_NAME_JOB_ENGINE)); assertEquals(1, kylinConfig.getRestServers().length); @@ -133,7 +118,6 @@ public class HelixClusterAdminTest extends LocalFileMetadataTestCase { clusterAdmin2.stop(); } - server.shutdown(); cleanupTestMetadata(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java index 763bebe..8193884 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java @@ -1,366 +1,354 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.rest.service; - -import static org.junit.Assert.*; - -import java.io.File; -import java.util.Arrays; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; - -import org.I0Itec.zkclient.IDefaultNameSpace; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkServer; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.restclient.Broadcaster; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.cube.CubeDescManager; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.LookupDesc; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.metadata.project.ProjectManager; -import org.apache.kylin.metadata.realization.IRealization; -import org.apache.kylin.metadata.realization.RealizationType; -import org.apache.kylin.rest.broadcaster.BroadcasterReceiveServlet; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class CacheServiceTest extends LocalFileMetadataTestCase { - - private static Server server; - - private static String ZK_ADDRESS = "localhost:2199"; - - private static KylinConfig configA; - private static KylinConfig configB; - - private static final Logger logger = LoggerFactory.getLogger(CacheServiceTest.class); - - private static AtomicLong counter = new AtomicLong(); - - @BeforeClass - public static void beforeClass() throws Exception { - staticCreateTestMetadata(); - configA = KylinConfig.getInstanceFromEnv(); - configA.setProperty("kylin.rest.servers", "localhost:7070"); - configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam()); - configB.setProperty("kylin.rest.servers", "localhost:7070"); - configB.setMetadataUrl("../examples/test_metadata"); - - server = new Server(7070); - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); - context.setContextPath("/"); - server.setHandler(context); - - final CacheService serviceA = new CacheService() { - @Override - public KylinConfig getConfig() { - return configA; - } - }; - final CacheService serviceB = new CacheService() { - @Override - public KylinConfig getConfig() { - return configB; - } - }; - - final CubeService cubeServiceA = new CubeService() { - @Override - public KylinConfig getConfig() { - return configA; - } - }; - final CubeService cubeServiceB = new CubeService() { - @Override - public KylinConfig getConfig() { - return configB; - } - }; - - serviceA.setCubeService(cubeServiceA); - serviceA.initCubeChangeListener(); - serviceB.setCubeService(cubeServiceB); - serviceB.initCubeChangeListener(); - - context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() { - @Override - public void handle(String type, String name, String event) { - - Broadcaster.TYPE wipeType = Broadcaster.TYPE.getType(type); - Broadcaster.EVENT wipeEvent = Broadcaster.EVENT.getEvent(event); - final String log = "wipe cache type: " + wipeType + " event:" + wipeEvent + " name:" + name; - logger.info(log); - try { - switch (wipeEvent) { - case CREATE: - case UPDATE: - serviceA.rebuildCache(wipeType, name); - serviceB.rebuildCache(wipeType, name); - break; - case DROP: - serviceA.removeCache(wipeType, name); - serviceB.removeCache(wipeType, name); - break; - default: - throw new RuntimeException("invalid type:" + wipeEvent); - } - } finally { - counter.incrementAndGet(); - } - } - })), "/"); - - server.start(); - } - - @AfterClass - public static void afterClass() throws Exception { - server.stop(); - cleanAfterClass(); - } - - @Before - public void setUp() throws Exception { - counter.set(0L); - createTestMetadata(); - } - - @After - public void after() throws Exception { - cleanupTestMetadata(); - } - - private void waitForCounterAndClear(long count) { - int retryTimes = 0; - while ((!counter.compareAndSet(count, 0L))) { - if (++retryTimes > 30) { - throw new RuntimeException("timeout"); - } - try { - Thread.sleep(100L); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - private static CubeManager getCubeManager(KylinConfig config) throws Exception { - return CubeManager.getInstance(config); - } - - private static ProjectManager getProjectManager(KylinConfig config) throws Exception { - return ProjectManager.getInstance(config); - } - - private static CubeDescManager getCubeDescManager(KylinConfig config) throws Exception { - return CubeDescManager.getInstance(config); - } - - private static MetadataManager getMetadataManager(KylinConfig config) throws Exception { - return MetadataManager.getInstance(config); - } - - @Test - public void testBasic() throws Exception { - assertTrue(!configA.equals(configB)); - - assertNotNull(getCubeManager(configA)); - assertNotNull(getCubeManager(configB)); - assertNotNull(getCubeDescManager(configA)); - assertNotNull(getCubeDescManager(configB)); - assertNotNull(getProjectManager(configB)); - assertNotNull(getProjectManager(configB)); - assertNotNull(getMetadataManager(configB)); - assertNotNull(getMetadataManager(configB)); - - assertTrue(!getCubeManager(configA).equals(getCubeManager(configB))); - assertTrue(!getCubeDescManager(configA).equals(getCubeDescManager(configB))); - assertTrue(!getProjectManager(configA).equals(getProjectManager(configB))); - assertTrue(!getMetadataManager(configA).equals(getMetadataManager(configB))); - - assertEquals(getProjectManager(configA).listAllProjects().size(), getProjectManager(configB).listAllProjects().size()); - } - - @Test - public void testCubeCRUD() throws Exception { - final Broadcaster broadcaster = Broadcaster.getInstance(configA); - broadcaster.getCounterAndClear(); - - getStore().deleteResource("/cube/a_whole_new_cube.json"); - - //create cube - - final String cubeName = "a_whole_new_cube"; - final CubeManager cubeManager = getCubeManager(configA); - final CubeManager cubeManagerB = getCubeManager(configB); - final ProjectManager projectManager = getProjectManager(configA); - final ProjectManager projectManagerB = getProjectManager(configB); - final CubeDescManager cubeDescManager = getCubeDescManager(configA); - final CubeDescManager cubeDescManagerB = getCubeDescManager(configB); - final CubeDesc cubeDesc = getCubeDescManager(configA).getCubeDesc("test_kylin_cube_with_slr_desc"); - - assertTrue(cubeManager.getCube(cubeName) == null); - assertTrue(cubeManagerB.getCube(cubeName) == null); - assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName)); - assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName)); - cubeManager.createCube(cubeName, ProjectInstance.DEFAULT_PROJECT_NAME, cubeDesc, null); - //one for cube update, one for project update - assertEquals(2, broadcaster.getCounterAndClear()); - waitForCounterAndClear(2); - - assertNotNull(cubeManager.getCube(cubeName)); - assertNotNull(cubeManagerB.getCube(cubeName)); - assertTrue(containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName)); - assertTrue(containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName)); - - //update cube - CubeInstance cube = cubeManager.getCube(cubeName); - assertEquals(0, cube.getSegments().size()); - assertEquals(0, cubeManagerB.getCube(cubeName).getSegments().size()); - CubeSegment segment = new CubeSegment(); - segment.setName("test_segment"); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToAddSegs(segment); - cube = cubeManager.updateCube(cubeBuilder); - //one for cube update - assertEquals(1, broadcaster.getCounterAndClear()); - waitForCounterAndClear(1); - assertEquals(1, cubeManagerB.getCube(cubeName).getSegments().size()); - assertEquals(segment.getName(), cubeManagerB.getCube(cubeName).getSegments().get(0).getName()); - - //delete cube - cubeManager.dropCube(cubeName, false); - //one for cube update, one for project update - assertEquals(2, broadcaster.getCounterAndClear()); - waitForCounterAndClear(2); - - assertTrue(cubeManager.getCube(cubeName) == null); - assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName)); - assertTrue(cubeManagerB.getCube(cubeName) == null); - assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName)); - - final String cubeDescName = "test_cube_desc"; - cubeDesc.setName(cubeDescName); - cubeDesc.setLastModified(0); - assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null); - assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null); - cubeDescManager.createCubeDesc(cubeDesc); - //one for add cube desc - assertEquals(1, broadcaster.getCounterAndClear()); - waitForCounterAndClear(1); - assertNotNull(cubeDescManager.getCubeDesc(cubeDescName)); - assertNotNull(cubeDescManagerB.getCubeDesc(cubeDescName)); - - cubeDesc.setNotifyList(Arrays.asList("test@email", "test@email", "test@email")); - cubeDescManager.updateCubeDesc(cubeDesc); - assertEquals(1, broadcaster.getCounterAndClear()); - waitForCounterAndClear(1); - assertEquals(cubeDesc.getNotifyList(), cubeDescManagerB.getCubeDesc(cubeDescName).getNotifyList()); - - cubeDescManager.removeCubeDesc(cubeDesc); - //one for add cube desc - assertEquals(1, broadcaster.getCounterAndClear()); - waitForCounterAndClear(1); - assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null); - assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null); - - getStore().deleteResource("/cube/a_whole_new_cube.json"); - } - - private TableDesc createTestTableDesc() { - TableDesc tableDesc = new TableDesc(); - tableDesc.setDatabase("TEST_DB"); - tableDesc.setName("TEST_TABLE"); - tableDesc.setUuid(UUID.randomUUID().toString()); - tableDesc.setLastModified(0); - return tableDesc; - } - - @Test - public void testMetaCRUD() throws Exception { - final MetadataManager metadataManager = MetadataManager.getInstance(configA); - final MetadataManager metadataManagerB = MetadataManager.getInstance(configB); - final Broadcaster broadcaster = Broadcaster.getInstance(configA); - broadcaster.getCounterAndClear(); - - TableDesc tableDesc = createTestTableDesc(); - assertTrue(metadataManager.getTableDesc(tableDesc.getIdentity()) == null); - assertTrue(metadataManagerB.getTableDesc(tableDesc.getIdentity()) == null); - metadataManager.saveSourceTable(tableDesc); - //only one for table insert - assertEquals(1, broadcaster.getCounterAndClear()); - waitForCounterAndClear(1); - assertNotNull(metadataManager.getTableDesc(tableDesc.getIdentity())); - assertNotNull(metadataManagerB.getTableDesc(tableDesc.getIdentity())); - - final String dataModelName = "test_data_model"; - DataModelDesc dataModelDesc = metadataManager.getDataModelDesc("test_kylin_left_join_model_desc"); - dataModelDesc.setName(dataModelName); - dataModelDesc.setLastModified(0); - assertTrue(metadataManager.getDataModelDesc(dataModelName) == null); - assertTrue(metadataManagerB.getDataModelDesc(dataModelName) == null); - - dataModelDesc.setName(dataModelName); - metadataManager.createDataModelDesc(dataModelDesc, "default", "ADMIN"); - //one for data model creation, one for project meta update - assertEquals(2, broadcaster.getCounterAndClear()); - waitForCounterAndClear(2); - assertEquals(dataModelDesc.getName(), metadataManagerB.getDataModelDesc(dataModelName).getName()); - - final LookupDesc[] lookups = dataModelDesc.getLookups(); - assertTrue(lookups.length > 0); - dataModelDesc.setLookups(lookups); - metadataManager.updateDataModelDesc(dataModelDesc); - //only one for data model update - assertEquals(1, broadcaster.getCounterAndClear()); - waitForCounterAndClear(1); - assertEquals(dataModelDesc.getLookups().length, metadataManagerB.getDataModelDesc(dataModelName).getLookups().length); - - } - - private boolean containsRealization(Set<IRealization> realizations, RealizationType type, String name) { - for (IRealization realization : realizations) { - if (realization.getType() == type && realization.getName().equals(name)) { - return true; - } - } - return false; - } - -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +//*/ +// +//package org.apache.kylin.rest.service; +// +//import org.apache.kylin.common.KylinConfig; +//import org.apache.kylin.common.restclient.Broadcaster; +//import org.apache.kylin.common.util.LocalFileMetadataTestCase; +//import org.apache.kylin.cube.*; +//import org.apache.kylin.cube.model.CubeDesc; +//import org.apache.kylin.metadata.MetadataManager; +//import org.apache.kylin.metadata.model.DataModelDesc; +//import org.apache.kylin.metadata.model.LookupDesc; +//import org.apache.kylin.metadata.model.TableDesc; +//import org.apache.kylin.metadata.project.ProjectInstance; +//import org.apache.kylin.metadata.project.ProjectManager; +//import org.apache.kylin.metadata.realization.IRealization; +//import org.apache.kylin.metadata.realization.RealizationType; +//import org.apache.kylin.rest.broadcaster.BroadcasterReceiveServlet; +//import org.eclipse.jetty.server.Server; +//import org.eclipse.jetty.servlet.ServletContextHandler; +//import org.eclipse.jetty.servlet.ServletHolder; +//import org.junit.*; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import java.util.Arrays; +//import java.util.Set; +//import java.util.UUID; +//import java.util.concurrent.atomic.AtomicLong; +// +//import static org.junit.Assert.*; +// +///** +// */ +//public class CacheServiceTest extends LocalFileMetadataTestCase { +// +// private static Server server; +// +// private static String ZK_ADDRESS = "localhost:2199"; +// +// private static KylinConfig configA; +// private static KylinConfig configB; +// +// private static final Logger logger = LoggerFactory.getLogger(CacheServiceTest.class); +// +// private static AtomicLong counter = new AtomicLong(); +// +// @BeforeClass +// public static void beforeClass() throws Exception { +// staticCreateTestMetadata(); +// configA = KylinConfig.getInstanceFromEnv(); +// configA.setProperty("kylin.rest.servers", "localhost:7070"); +// configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam()); +// configB.setProperty("kylin.rest.servers", "localhost:7070"); +// configB.setMetadataUrl("../examples/test_metadata"); +// +// server = new Server(7070); +// ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); +// context.setContextPath("/"); +// server.setHandler(context); +// +// final CacheService serviceA = new CacheService() { +// @Override +// public KylinConfig getConfig() { +// return configA; +// } +// }; +// final CacheService serviceB = new CacheService() { +// @Override +// public KylinConfig getConfig() { +// return configB; +// } +// }; +// +// final CubeService cubeServiceA = new CubeService() { +// @Override +// public KylinConfig getConfig() { +// return configA; +// } +// }; +// final CubeService cubeServiceB = new CubeService() { +// @Override +// public KylinConfig getConfig() { +// return configB; +// } +// }; +// +// serviceA.setCubeService(cubeServiceA); +// serviceA.initCubeChangeListener(); +// serviceB.setCubeService(cubeServiceB); +// serviceB.initCubeChangeListener(); +// +// context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() { +// @Override +// public void handle(String type, String name, String event) { +// +// Broadcaster.TYPE wipeType = Broadcaster.TYPE.getType(type); +// Broadcaster.EVENT wipeEvent = Broadcaster.EVENT.getEvent(event); +// final String log = "wipe cache type: " + wipeType + " event:" + wipeEvent + " name:" + name; +// logger.info(log); +// try { +// switch (wipeEvent) { +// case CREATE: +// case UPDATE: +// serviceA.rebuildCache(wipeType, name); +// serviceB.rebuildCache(wipeType, name); +// break; +// case DROP: +// serviceA.removeCache(wipeType, name); +// serviceB.removeCache(wipeType, name); +// break; +// default: +// throw new RuntimeException("invalid type:" + wipeEvent); +// } +// } finally { +// counter.incrementAndGet(); +// } +// } +// })), "/"); +// +// server.start(); +// } +// +// @AfterClass +// public static void afterClass() throws Exception { +// server.stop(); +// cleanAfterClass(); +// } +// +// @Before +// public void setUp() throws Exception { +// counter.set(0L); +// createTestMetadata(); +// } +// +// @After +// public void after() throws Exception { +// cleanupTestMetadata(); +// } +// +// private void waitForCounterAndClear(long count) { +// int retryTimes = 0; +// while ((!counter.compareAndSet(count, 0L))) { +// if (++retryTimes > 30) { +// throw new RuntimeException("timeout"); +// } +// try { +// Thread.sleep(100L); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// } +// } +// +// private static CubeManager getCubeManager(KylinConfig config) throws Exception { +// return CubeManager.getInstance(config); +// } +// +// private static ProjectManager getProjectManager(KylinConfig config) throws Exception { +// return ProjectManager.getInstance(config); +// } +// +// private static CubeDescManager getCubeDescManager(KylinConfig config) throws Exception { +// return CubeDescManager.getInstance(config); +// } +// +// private static MetadataManager getMetadataManager(KylinConfig config) throws Exception { +// return MetadataManager.getInstance(config); +// } +// +// @Test +// public void testBasic() throws Exception { +// assertTrue(!configA.equals(configB)); +// +// assertNotNull(getCubeManager(configA)); +// assertNotNull(getCubeManager(configB)); +// assertNotNull(getCubeDescManager(configA)); +// assertNotNull(getCubeDescManager(configB)); +// assertNotNull(getProjectManager(configB)); +// assertNotNull(getProjectManager(configB)); +// assertNotNull(getMetadataManager(configB)); +// assertNotNull(getMetadataManager(configB)); +// +// assertTrue(!getCubeManager(configA).equals(getCubeManager(configB))); +// assertTrue(!getCubeDescManager(configA).equals(getCubeDescManager(configB))); +// assertTrue(!getProjectManager(configA).equals(getProjectManager(configB))); +// assertTrue(!getMetadataManager(configA).equals(getMetadataManager(configB))); +// +// assertEquals(getProjectManager(configA).listAllProjects().size(), getProjectManager(configB).listAllProjects().size()); +// } +// +// @Test +// public void testCubeCRUD() throws Exception { +// final Broadcaster broadcaster = Broadcaster.getInstance(configA); +// broadcaster.getCounterAndClear(); +// +// getStore().deleteResource("/cube/a_whole_new_cube.json"); +// +// //create cube +// +// final String cubeName = "a_whole_new_cube"; +// final CubeManager cubeManager = getCubeManager(configA); +// final CubeManager cubeManagerB = getCubeManager(configB); +// final ProjectManager projectManager = getProjectManager(configA); +// final ProjectManager projectManagerB = getProjectManager(configB); +// final CubeDescManager cubeDescManager = getCubeDescManager(configA); +// final CubeDescManager cubeDescManagerB = getCubeDescManager(configB); +// final CubeDesc cubeDesc = getCubeDescManager(configA).getCubeDesc("test_kylin_cube_with_slr_desc"); +// +// assertTrue(cubeManager.getCube(cubeName) == null); +// assertTrue(cubeManagerB.getCube(cubeName) == null); +// assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName)); +// assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName)); +// cubeManager.createCube(cubeName, ProjectInstance.DEFAULT_PROJECT_NAME, cubeDesc, null); +// //one for cube update, one for project update +// assertEquals(2, broadcaster.getCounterAndClear()); +// waitForCounterAndClear(2); +// +// assertNotNull(cubeManager.getCube(cubeName)); +// assertNotNull(cubeManagerB.getCube(cubeName)); +// assertTrue(containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName)); +// assertTrue(containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName)); +// +// //update cube +// CubeInstance cube = cubeManager.getCube(cubeName); +// assertEquals(0, cube.getSegments().size()); +// assertEquals(0, cubeManagerB.getCube(cubeName).getSegments().size()); +// CubeSegment segment = new CubeSegment(); +// segment.setName("test_segment"); +// CubeUpdate cubeBuilder = new CubeUpdate(cube); +// cubeBuilder.setToAddSegs(segment); +// cube = cubeManager.updateCube(cubeBuilder); +// //one for cube update +// assertEquals(1, broadcaster.getCounterAndClear()); +// waitForCounterAndClear(1); +// assertEquals(1, cubeManagerB.getCube(cubeName).getSegments().size()); +// assertEquals(segment.getName(), cubeManagerB.getCube(cubeName).getSegments().get(0).getName()); +// +// //delete cube +// cubeManager.dropCube(cubeName, false); +// //one for cube update, one for project update +// assertEquals(2, broadcaster.getCounterAndClear()); +// waitForCounterAndClear(2); +// +// assertTrue(cubeManager.getCube(cubeName) == null); +// assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName)); +// assertTrue(cubeManagerB.getCube(cubeName) == null); +// assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName)); +// +// final String cubeDescName = "test_cube_desc"; +// cubeDesc.setName(cubeDescName); +// cubeDesc.setLastModified(0); +// assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null); +// assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null); +// cubeDescManager.createCubeDesc(cubeDesc); +// //one for add cube desc +// assertEquals(1, broadcaster.getCounterAndClear()); +// waitForCounterAndClear(1); +// assertNotNull(cubeDescManager.getCubeDesc(cubeDescName)); +// assertNotNull(cubeDescManagerB.getCubeDesc(cubeDescName)); +// +// cubeDesc.setNotifyList(Arrays.asList("test@email", "test@email", "test@email")); +// cubeDescManager.updateCubeDesc(cubeDesc); +// assertEquals(1, broadcaster.getCounterAndClear()); +// waitForCounterAndClear(1); +// assertEquals(cubeDesc.getNotifyList(), cubeDescManagerB.getCubeDesc(cubeDescName).getNotifyList()); +// +// cubeDescManager.removeCubeDesc(cubeDesc); +// //one for add cube desc +// assertEquals(1, broadcaster.getCounterAndClear()); +// waitForCounterAndClear(1); +// assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null); +// assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null); +// +// getStore().deleteResource("/cube/a_whole_new_cube.json"); +// } +// +// private TableDesc createTestTableDesc() { +// TableDesc tableDesc = new TableDesc(); +// tableDesc.setDatabase("TEST_DB"); +// tableDesc.setName("TEST_TABLE"); +// tableDesc.setUuid(UUID.randomUUID().toString()); +// tableDesc.setLastModified(0); +// return tableDesc; +// } +// +// @Test +// public void testMetaCRUD() throws Exception { +// final MetadataManager metadataManager = MetadataManager.getInstance(configA); +// final MetadataManager metadataManagerB = MetadataManager.getInstance(configB); +// final Broadcaster broadcaster = Broadcaster.getInstance(configA); +// broadcaster.getCounterAndClear(); +// +// TableDesc tableDesc = createTestTableDesc(); +// assertTrue(metadataManager.getTableDesc(tableDesc.getIdentity()) == null); +// assertTrue(metadataManagerB.getTableDesc(tableDesc.getIdentity()) == null); +// metadataManager.saveSourceTable(tableDesc); +// //only one for table insert +// assertEquals(1, broadcaster.getCounterAndClear()); +// waitForCounterAndClear(1); +// assertNotNull(metadataManager.getTableDesc(tableDesc.getIdentity())); +// assertNotNull(metadataManagerB.getTableDesc(tableDesc.getIdentity())); +// +// final String dataModelName = "test_data_model"; +// DataModelDesc dataModelDesc = metadataManager.getDataModelDesc("test_kylin_left_join_model_desc"); +// dataModelDesc.setName(dataModelName); +// dataModelDesc.setLastModified(0); +// assertTrue(metadataManager.getDataModelDesc(dataModelName) == null); +// assertTrue(metadataManagerB.getDataModelDesc(dataModelName) == null); +// +// dataModelDesc.setName(dataModelName); +// metadataManager.createDataModelDesc(dataModelDesc, "default", "ADMIN"); +// //one for data model creation, one for project meta update +// assertEquals(2, broadcaster.getCounterAndClear()); +// waitForCounterAndClear(2); +// assertEquals(dataModelDesc.getName(), metadataManagerB.getDataModelDesc(dataModelName).getName()); +// +// final LookupDesc[] lookups = dataModelDesc.getLookups(); +// assertTrue(lookups.length > 0); +// dataModelDesc.setLookups(lookups); +// metadataManager.updateDataModelDesc(dataModelDesc); +// //only one for data model update +// assertEquals(1, broadcaster.getCounterAndClear()); +// waitForCounterAndClear(1); +// assertEquals(dataModelDesc.getLookups().length, metadataManagerB.getDataModelDesc(dataModelName).getLookups().length); +// +// } +// +// private boolean containsRealization(Set<IRealization> realizations, RealizationType type, String name) { +// for (IRealization realization : realizations) { +// if (realization.getType() == type && realization.getName().equals(name)) { +// return true; +// } +// } +// return false; +// } +// +//} http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java index f8dc945..ca4fe39 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java +++ b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java @@ -18,6 +18,12 @@ package org.apache.kylin.rest.service; +import com.google.common.collect.Lists; +import org.I0Itec.zkclient.IDefaultNameSpace; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkServer; +import org.apache.hadoop.fs.FileUtil; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.CubeManager; @@ -26,42 +32,42 @@ import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.realization.RealizationRegistry; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; +import org.apache.kylin.rest.helix.HelixClusterAdmin; +import org.junit.*; import org.junit.runner.RunWith; import org.springframework.security.authentication.TestingAuthenticationToken; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.security.core.userdetails.User; +import org.springframework.security.core.userdetails.UserDetails; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import java.io.File; +import java.util.Arrays; +import java.util.List; + /** * @author xduo */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:applicationContext.xml", "classpath:kylinSecurity.xml" }) @ActiveProfiles("testing") -public class ServiceTestBase extends LocalFileMetadataTestCase { - - @BeforeClass - public static void setupResource() throws Exception { - staticCreateTestMetadata(); - Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", "ROLE_ADMIN"); - SecurityContextHolder.getContext().setAuthentication(authentication); - } - - @AfterClass - public static void tearDownResource() { - } +public class ServiceTestBase extends TestBaseWithZookeeper { @Before public void setup() throws Exception { this.createTestMetadata(); + UserService.UserGrantedAuthority userGrantedAuthority = new UserService.UserGrantedAuthority(); + userGrantedAuthority.setAuthority("ROLE_ADMIN"); + UserDetails user = new User("ADMIN", "skippped-ldap", Lists.newArrayList(userGrantedAuthority)); + Authentication authentication = new TestingAuthenticationToken(user, "ADMIN", "ROLE_ADMIN"); + SecurityContextHolder.getContext().setAuthentication(authentication); + KylinConfig kylinConfig = this.getTestConfig(); + kylinConfig.setRestAddress("localhost:7070"); + MetadataManager.clearCache(); CubeDescManager.clearCache(); CubeManager.clearCache();
