KYLIN-1311 Stream cubing auto assignment and load balance
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/782a3e5e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/782a3e5e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/782a3e5e Branch: refs/heads/helix-rebase Commit: 782a3e5e4ec8127b1cb485b08edf3b1aec7332e3 Parents: 9274aa8 Author: shaofengshi <shaofeng...@apache.org> Authored: Thu Jan 14 14:59:54 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Mar 2 17:24:11 2016 +0800 ---------------------------------------------------------------------- .../kylin/engine/streaming/BootstrapConfig.java | 8 -- .../engine/streaming/cli/StreamingCLI.java | 3 - .../kylin/rest/controller/CubeController.java | 5 ++ .../rest/controller/StreamingController.java | 50 +++++++++++++ .../kylin/rest/helix/HelixClusterAdmin.java | 13 +++- .../helix/LeaderStandbyStateModelFactory.java | 43 +++++++---- .../rest/request/StreamingBuildRequest.java | 77 ++++++++++++++++++++ .../kylin/rest/request/StreamingRequest.java | 4 +- .../kylin/rest/service/StreamingService.java | 27 +++++++ 9 files changed, 201 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/782a3e5e/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java index a3e2db5..2b83b84 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java @@ -36,14 +36,6 @@ public class BootstrapConfig { this.streaming = streaming; } - public int getPartitionId() { - return partitionId; - } - - public void setPartitionId(int partitionId) { - this.partitionId = partitionId; - } - public boolean isFillGap() { return fillGap; } http://git-wip-us.apache.org/repos/asf/kylin/blob/782a3e5e/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java index a73a6ac..96ad1ad 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java @@ -72,9 +72,6 @@ public class StreamingCLI { case "-streaming": bootstrapConfig.setStreaming(args[++i]); break; - case "-partition": - bootstrapConfig.setPartitionId(Integer.parseInt(args[++i])); - break; case "-fillGap": bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i])); break; http://git-wip-us.apache.org/repos/asf/kylin/blob/782a3e5e/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 9afa750..4ab640f 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -27,14 +27,19 @@ import java.util.Map; import java.util.UUID; import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.cube.model.CubeBuildTypeEnum; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.engine.streaming.BootstrapConfig; import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.engine.streaming.monitor.StreamingMonitor; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.job.exception.JobException; http://git-wip-us.apache.org/repos/asf/kylin/blob/782a3e5e/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java index e22bd30..57831d5 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java @@ -21,14 +21,23 @@ package org.apache.kylin.rest.controller; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonMappingException; +import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeBuildTypeEnum; +import org.apache.kylin.engine.streaming.BootstrapConfig; import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.exception.JobException; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.exception.ForbiddenException; import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.exception.NotFoundException; +import org.apache.kylin.rest.request.StreamingBuildRequest; import org.apache.kylin.rest.request.StreamingRequest; +import org.apache.kylin.rest.service.CubeService; import org.apache.kylin.rest.service.KafkaConfigService; import org.apache.kylin.rest.service.StreamingService; import org.apache.kylin.source.kafka.config.KafkaConfig; @@ -36,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.access.AccessDeniedException; +import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.*; @@ -58,6 +68,9 @@ public class StreamingController extends BasicController { @Autowired private KafkaConfigService kafkaConfigService; + @Autowired + private CubeService cubeService; + @RequestMapping(value = "/getConfig", method = { RequestMethod.GET }) @ResponseBody public List<StreamingConfig> getStreamings(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) { @@ -214,6 +227,43 @@ public class StreamingController extends BasicController { request.setMessage(message); } + + + /** + * Send a stream build request + * + * @param cubeName Cube ID + * @return + * @throws IOException + */ + @RequestMapping(value = "/{streamingName}/build", method = {RequestMethod.PUT}) + @ResponseBody + public StreamingBuildRequest buildStream(@PathVariable String streamingName, @RequestBody StreamingBuildRequest streamingBuildRequest) { + streamingBuildRequest.setStreaming(streamingName); + StreamingConfig streamingConfig = streamingService.getStreamingManager().getConfig(streamingName); + Preconditions.checkNotNull(streamingConfig, "Stream config '" + streamingName + "' is not found."); + String cubeName = streamingConfig.getCubeName(); + List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null); + Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found."); + CubeInstance cube = cubes.get(0); + if (streamingBuildRequest.isFillGap() == false) { + Preconditions.checkArgument(streamingBuildRequest.getEnd() > streamingBuildRequest.getStart(), "End time should be greater than start time."); + for (CubeSegment segment : cube.getSegments()) { + if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) { + streamingBuildRequest.setMessage("The segment already exists: " + segment.toString()); + streamingBuildRequest.setSuccessful(false); + return streamingBuildRequest; + } + } + } + + streamingService.buildStream(streamingName, streamingBuildRequest); + streamingBuildRequest.setMessage("Build request is submitted successfully."); + streamingBuildRequest.setSuccessful(true); + return streamingBuildRequest; + + } + public void setStreamingService(StreamingService streamingService) { this.streamingService= streamingService; } http://git-wip-us.apache.org/repos/asf/kylin/blob/782a3e5e/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java index 6300383..f62204d 100644 --- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java +++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java @@ -45,7 +45,7 @@ import java.util.concurrent.ConcurrentMap; public class HelixClusterAdmin { public static final String RESOURCE_NAME_JOB_ENGINE = "Resource_JobEngine"; - public static final String RESOURCE_STREAME_CUBE_PREFIX = "Resource_Streame_"; + public static final String RESOURCE_STREAME_CUBE_PREFIX = "Resource_Stream_"; public static final String MODEL_LEADER_STANDBY = "LeaderStandby"; public static final String MODEL_ONLINE_OFFLINE = "OnlineOffline"; @@ -115,15 +115,22 @@ public class HelixClusterAdmin { } - public void addStreamCubeSlice(String cubeName, long start, long end) { - String resourceName = RESOURCE_STREAME_CUBE_PREFIX + cubeName + "_" + start + "_" + end; + public void addStreamingJob(String streamingName, long start, long end) { + String resourceName = RESOURCE_STREAME_CUBE_PREFIX + streamingName + "_" + start + "_" + end; if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) { admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name()); + } else { + logger.warn("Resource '" + resourceName + "' already exists in cluster, skip adding."); } admin.rebalance(clusterName, resourceName, 2, "", TAG_STREAM_BUILDER); } + + public void dropStreamingJob(String streamingName, long start, long end) { + String resourceName = RESOURCE_STREAME_CUBE_PREFIX + streamingName + "_" + start + "_" + end; + admin.dropResource(clusterName, resourceName); + } /** * Start the instance and register the state model factory http://git-wip-us.apache.org/repos/asf/kylin/blob/782a3e5e/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java index c2a78e7..df23ea0 100644 --- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java +++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java @@ -9,21 +9,24 @@ import org.apache.helix.api.id.ResourceId; import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.Transition; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; -import org.apache.kylin.engine.streaming.cli.StreamingCLI; +import org.apache.kylin.common.KylinConfigBase; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; import org.apache.kylin.job.lock.MockJobLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + import static org.apache.kylin.rest.helix.HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX; /** */ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> { private static final Logger logger = LoggerFactory.getLogger(LeaderStandbyStateModelFactory.class); - + @Override public TransitionHandler createStateTransitionHandler(PartitionId partitionId) { if (partitionId.getResourceId().equals(ResourceId.from(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE))) { @@ -38,7 +41,7 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor } public static class JobEngineStateModel extends TransitionHandler { - + public static JobEngineStateModel INSTANCE = new JobEngineStateModel(); @Transition(to = "LEADER", from = "STANDBY") @@ -62,7 +65,7 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor public void onBecomeStandbyFromLeader(Message message, NotificationContext context) { logger.info("JobEngineStateModel.onBecomeStandbyFromLeader()"); DefaultScheduler.destroyInstance(); - + } @Transition(to = "STANDBY", from = "OFFLINE") @@ -71,7 +74,6 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor } - @Transition(to = "OFFLINE", from = "STANDBY") public void onBecomeOfflineFromStandby(Message message, NotificationContext context) { logger.info("JobEngineStateModel.onBecomeOfflineFromStandby()"); @@ -80,7 +82,7 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor } public static class StreamCubeStateModel extends TransitionHandler { - + public static StreamCubeStateModel INSTANCE = new StreamCubeStateModel(); @Transition(to = "LEADER", from = "STANDBY") @@ -90,27 +92,40 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_")) + 1); String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_")); long start = Long.parseLong(temp.substring(temp.lastIndexOf("_")) + 1); - String cubeName = temp.substring(0, temp.lastIndexOf("_")); + String streamingConfig = temp.substring(0, temp.lastIndexOf("_")); + + KylinConfigBase.getKylinHome(); + String segmentId = start + "_" + end; + String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingConfig + " " + segmentId + " -oneoff true -start " + start + " -end " + end + " -streaming " + streamingConfig; + logger.info("Executing: " + cmd); + try { + String line; + Process p = Runtime.getRuntime().exec(cmd); + BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); + while ((line = input.readLine()) != null) { + logger.info(line); + } + input.close(); + } catch (IOException err) { + logger.error("Error happens during build streaming '" + resourceName + "'", err); + throw new RuntimeException(err); + } - final Runnable runnable = new OneOffStreamingBuilder(cubeName, start, end).build(); - runnable.run(); } @Transition(to = "STANDBY", from = "LEADER") public void onBecomeStandbyFromLeader(Message message, NotificationContext context) { - } @Transition(to = "STANDBY", from = "OFFLINE") public void onBecomeStandbyFromOffline(Message message, NotificationContext context) { - - } + } @Transition(to = "OFFLINE", from = "STANDBY") public void onBecomeOfflineFromStandby(Message message, NotificationContext context) { - + } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/782a3e5e/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java new file mode 100644 index 0000000..e06a06c --- /dev/null +++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.request; + +public class StreamingBuildRequest { + + private String streaming; + private long start; + private long end; + private boolean fillGap; + private String message; + private boolean successful; + + public String getStreaming() { + return streaming; + } + + public void setStreaming(String streaming) { + this.streaming = streaming; + } + + public boolean isSuccessful() { + return successful; + } + + public void setSuccessful(boolean successful) { + this.successful = successful; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public long getStart() { + return start; + } + + public void setStart(long start) { + this.start = start; + } + + public long getEnd() { + return end; + } + + public void setEnd(long end) { + this.end = end; + } + + public boolean isFillGap() { + return fillGap; + } + + public void setFillGap(boolean fillGap) { + this.fillGap = fillGap; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/782a3e5e/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java index 07c30f3..b737c3e 100644 --- a/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java +++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java @@ -19,7 +19,9 @@ package org.apache.kylin.rest.request; -import java.lang.String;public class StreamingRequest { +import java.lang.String; + +public class StreamingRequest { private String project; http://git-wip-us.apache.org/repos/asf/kylin/blob/782a3e5e/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java index e40426b..da20949 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java @@ -18,12 +18,22 @@ package org.apache.kylin.rest.service; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.engine.streaming.BootstrapConfig; import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.engine.streaming.monitor.StreamingMonitor; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.rest.helix.HelixClusterAdmin; +import org.apache.kylin.rest.request.StreamingBuildRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.access.prepost.PostFilter; +import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.stereotype.Component; import java.io.IOException; @@ -33,6 +43,7 @@ import java.util.List; @Component("streamingMgmtService") public class StreamingService extends BasicService { + private static final Logger logger = LoggerFactory.getLogger(StreamingService.class); @Autowired private AccessService accessService; @@ -87,4 +98,20 @@ public class StreamingService extends BasicService { getStreamingManager().removeStreamingConfig(config); } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") + public void buildStream(String cube, StreamingBuildRequest streamingBuildRequest) { + HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv()); + if (streamingBuildRequest.isFillGap()) { + final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingBuildRequest.getStreaming()); + final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName()); + logger.info("all gaps:" + org.apache.commons.lang3.StringUtils.join(gaps, ",")); + for (Pair<Long, Long> gap : gaps) { + clusterAdmin.addStreamingJob(streamingBuildRequest.getStreaming(), gap.getFirst(), gap.getSecond()); + } + } else { + clusterAdmin.addStreamingJob(streamingBuildRequest.getStreaming(), streamingBuildRequest.getStart(), streamingBuildRequest.getEnd()); + } + } + }