KYLIN-1311 fix small bug

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8838d9fd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8838d9fd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8838d9fd

Branch: refs/heads/helix-rebase
Commit: 8838d9fdbfba69ebd62cf8f30556253c706afb14
Parents: b2b17a6
Author: shaofengshi <shaofeng...@apache.org>
Authored: Fri Jan 15 17:57:26 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Wed Mar 2 17:24:11 2016 +0800

----------------------------------------------------------------------
 .../engine/streaming/StreamingManager.java      | 11 +++++-----
 .../rest/controller/StreamingController.java    | 13 ++++++------
 .../helix/LeaderStandbyStateModelFactory.java   | 21 +++++++++++++++++---
 3 files changed, 30 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/8838d9fd/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index e0b086d..5c1c11e 100644
--- 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -108,6 +108,12 @@ public class StreamingManager {
         return streamingMap.get(name);
     }
 
+    public StreamingConfig getStreamingConfigByCube(String cubeName) {
+        String streamingConfig = cubeName + "_streaming";
+        return getStreamingConfig(streamingConfig);
+    }
+
+
     public List<StreamingConfig> listAllStreaming() {
         return new ArrayList<>(streamingMap.values());
     }
@@ -139,11 +145,6 @@ public class StreamingManager {
         streamingMap.remove(streamingConfig.getName());
     }
 
-    public StreamingConfig getConfig(String name) {
-        name = name.toUpperCase();
-        return streamingMap.get(name);
-    }
-
     public void removeStreamingLocal(String streamingName) {
         streamingMap.removeLocal(streamingName);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/8838d9fd/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
 
b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index 57831d5..fb806d1 100644
--- 
a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ 
b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -236,13 +236,11 @@ public class StreamingController extends BasicController {
      * @return
      * @throws IOException
      */
-    @RequestMapping(value = "/{streamingName}/build", method = 
{RequestMethod.PUT})
+    @RequestMapping(value = "/{cubeName}/build", method = {RequestMethod.PUT})
     @ResponseBody
-    public StreamingBuildRequest buildStream(@PathVariable String 
streamingName, @RequestBody StreamingBuildRequest streamingBuildRequest) {
-        streamingBuildRequest.setStreaming(streamingName);
-        StreamingConfig streamingConfig = 
streamingService.getStreamingManager().getConfig(streamingName);
-        Preconditions.checkNotNull(streamingConfig, "Stream config '" + 
streamingName + "' is not found.");
-        String cubeName = streamingConfig.getCubeName();
+    public StreamingBuildRequest buildStream(@PathVariable String cubeName, 
@RequestBody StreamingBuildRequest streamingBuildRequest) {
+        StreamingConfig streamingConfig = 
streamingService.getStreamingManager().getStreamingConfigByCube(cubeName);
+        Preconditions.checkNotNull(streamingConfig, "Stream config for '" + 
cubeName + "' is not found.");
         List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, 
null, null);
         Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + 
"' is not found.");
         CubeInstance cube = cubes.get(0);
@@ -257,7 +255,8 @@ public class StreamingController extends BasicController {
             }
         }
 
-        streamingService.buildStream(streamingName, streamingBuildRequest);
+        streamingBuildRequest.setStreaming(streamingConfig.getName());
+        streamingService.buildStream(cubeName, streamingBuildRequest);
         streamingBuildRequest.setMessage("Build request is submitted 
successfully.");
         streamingBuildRequest.setSuccessful(true);
         return streamingBuildRequest;

http://git-wip-us.apache.org/repos/asf/kylin/blob/8838d9fd/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
 
b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
index df23ea0..8614e8c 100644
--- 
a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
+++ 
b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
@@ -10,6 +10,10 @@ import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.job.lock.MockJobLock;
@@ -48,7 +52,7 @@ public class LeaderStandbyStateModelFactory extends 
StateTransitionHandlerFactor
         public void onBecomeLeaderFromStandby(Message message, 
NotificationContext context) {
             logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()");
             try {
-                KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+                final KylinConfig kylinConfig = 
KylinConfig.getInstanceFromEnv();
                 DefaultScheduler scheduler = DefaultScheduler.createInstance();
                 scheduler.init(new JobEngineConfig(kylinConfig), new 
MockJobLock());
                 while (!scheduler.hasStarted()) {
@@ -89,11 +93,22 @@ public class LeaderStandbyStateModelFactory extends 
StateTransitionHandlerFactor
         public void onBecomeLeaderFromStandby(Message message, 
NotificationContext context) {
             String resourceName = message.getResourceId().stringify();
             
Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
-            long end = 
Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_")) + 1);
+            long end = 
Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1));
             String temp = 
resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), 
resourceName.lastIndexOf("_"));
-            long start = Long.parseLong(temp.substring(temp.lastIndexOf("_")) 
+ 1);
+            long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 
1));
             String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
 
+            final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            
+            final String cubeName = 
StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingConfig).getCubeName();
+            final CubeInstance cube = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+            for (CubeSegment segment : cube.getSegments()) {
+                if (segment.getDateRangeStart() <= start && 
segment.getDateRangeEnd() >= end) {
+                    logger.info("Segment " + segment.getName() + " already 
exist, no need rebuild.");
+                    return;
+                }
+            }
+            
             KylinConfigBase.getKylinHome();
             String segmentId = start + "_" + end;
             String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh 
streaming start " + streamingConfig + " " + segmentId + " -oneoff true -start " 
+ start + " -end " + end + " -streaming " + streamingConfig;

Reply via email to