KYLIN-1311 fix small bug
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8838d9fd Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8838d9fd Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8838d9fd Branch: refs/heads/helix-rebase Commit: 8838d9fdbfba69ebd62cf8f30556253c706afb14 Parents: b2b17a6 Author: shaofengshi <shaofeng...@apache.org> Authored: Fri Jan 15 17:57:26 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Mar 2 17:24:11 2016 +0800 ---------------------------------------------------------------------- .../engine/streaming/StreamingManager.java | 11 +++++----- .../rest/controller/StreamingController.java | 13 ++++++------ .../helix/LeaderStandbyStateModelFactory.java | 21 +++++++++++++++++--- 3 files changed, 30 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/8838d9fd/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java index e0b086d..5c1c11e 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java @@ -108,6 +108,12 @@ public class StreamingManager { return streamingMap.get(name); } + public StreamingConfig getStreamingConfigByCube(String cubeName) { + String streamingConfig = cubeName + "_streaming"; + return getStreamingConfig(streamingConfig); + } + + public List<StreamingConfig> listAllStreaming() { return new ArrayList<>(streamingMap.values()); } @@ -139,11 +145,6 @@ public class StreamingManager { streamingMap.remove(streamingConfig.getName()); } - public StreamingConfig getConfig(String name) { - name = name.toUpperCase(); - return streamingMap.get(name); - } - public void removeStreamingLocal(String streamingName) { streamingMap.removeLocal(streamingName); } http://git-wip-us.apache.org/repos/asf/kylin/blob/8838d9fd/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java index 57831d5..fb806d1 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java @@ -236,13 +236,11 @@ public class StreamingController extends BasicController { * @return * @throws IOException */ - @RequestMapping(value = "/{streamingName}/build", method = {RequestMethod.PUT}) + @RequestMapping(value = "/{cubeName}/build", method = {RequestMethod.PUT}) @ResponseBody - public StreamingBuildRequest buildStream(@PathVariable String streamingName, @RequestBody StreamingBuildRequest streamingBuildRequest) { - streamingBuildRequest.setStreaming(streamingName); - StreamingConfig streamingConfig = streamingService.getStreamingManager().getConfig(streamingName); - Preconditions.checkNotNull(streamingConfig, "Stream config '" + streamingName + "' is not found."); - String cubeName = streamingConfig.getCubeName(); + public StreamingBuildRequest buildStream(@PathVariable String cubeName, @RequestBody StreamingBuildRequest streamingBuildRequest) { + StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName); + Preconditions.checkNotNull(streamingConfig, "Stream config for '" + cubeName + "' is not found."); List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null); Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found."); CubeInstance cube = cubes.get(0); @@ -257,7 +255,8 @@ public class StreamingController extends BasicController { } } - streamingService.buildStream(streamingName, streamingBuildRequest); + streamingBuildRequest.setStreaming(streamingConfig.getName()); + streamingService.buildStream(cubeName, streamingBuildRequest); streamingBuildRequest.setMessage("Build request is submitted successfully."); streamingBuildRequest.setSuccessful(true); return streamingBuildRequest; http://git-wip-us.apache.org/repos/asf/kylin/blob/8838d9fd/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java index df23ea0..8614e8c 100644 --- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java +++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java @@ -10,6 +10,10 @@ import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.Transition; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigBase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.streaming.StreamingManager; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; import org.apache.kylin.job.lock.MockJobLock; @@ -48,7 +52,7 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor public void onBecomeLeaderFromStandby(Message message, NotificationContext context) { logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()"); try { - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); DefaultScheduler scheduler = DefaultScheduler.createInstance(); scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock()); while (!scheduler.hasStarted()) { @@ -89,11 +93,22 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor public void onBecomeLeaderFromStandby(Message message, NotificationContext context) { String resourceName = message.getResourceId().stringify(); Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX)); - long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_")) + 1); + long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1)); String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_")); - long start = Long.parseLong(temp.substring(temp.lastIndexOf("_")) + 1); + long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1)); String streamingConfig = temp.substring(0, temp.lastIndexOf("_")); + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + + final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingConfig).getCubeName(); + final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName); + for (CubeSegment segment : cube.getSegments()) { + if (segment.getDateRangeStart() <= start && segment.getDateRangeEnd() >= end) { + logger.info("Segment " + segment.getName() + " already exist, no need rebuild."); + return; + } + } + KylinConfigBase.getKylinHome(); String segmentId = start + "_" + end; String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingConfig + " " + segmentId + " -oneoff true -start " + start + " -end " + end + " -streaming " + streamingConfig;