http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
new file mode 100644
index 0000000..3e7cd3e
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.CuboidStatsReaderUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class UpdateCubeInfoAfterCheckpointStep extends AbstractExecutable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(UpdateCubeInfoAfterCheckpointStep.class);
+
+    public UpdateCubeInfoAfterCheckpointStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+        final CubeManager cubeManager = 
CubeManager.getInstance(context.getConfig());
+        final CubeInstance cube = 
cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+
+        Set<Long> recommendCuboids = cube.getCuboidsRecommend();
+        try {
+            List<CubeSegment> newSegments = 
cube.getSegments(SegmentStatusEnum.READY_PENDING);
+            Map<Long, Long> recommendCuboidsWithStats = CuboidStatsReaderUtil
+                    .readCuboidStatsFromSegments(recommendCuboids, 
newSegments);
+            if (recommendCuboidsWithStats == null) {
+                throw new RuntimeException("Fail to get statistics info for 
recommended cuboids after optimization!!!");
+            }
+            cubeManager.promoteCheckpointOptimizeSegments(cube, 
recommendCuboidsWithStats,
+                    newSegments.toArray(new CubeSegment[newSegments.size()]));
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } catch (Exception e) {
+            logger.error("fail to update cube after build", e);
+            return new ExecuteResult(e, e.getLocalizedMessage());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
new file mode 100644
index 0000000..965111b
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class UpdateCubeInfoAfterOptimizeStep extends AbstractExecutable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(UpdateCubeInfoAfterOptimizeStep.class);
+
+    public UpdateCubeInfoAfterOptimizeStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+        final CubeManager cubeManager = 
CubeManager.getInstance(context.getConfig());
+        final CubeInstance cube = 
cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeSegment segment = 
cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+
+        CubeSegment originalSegment = 
cube.getOriginalSegmentToOptimize(segment);
+        long sourceCount = originalSegment.getInputRecords();
+        long sourceSizeBytes = originalSegment.getInputRecordsSize();
+
+        CubingJob cubingJob = (CubingJob) 
getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+        long cubeSizeBytes = cubingJob.findCubeSizeBytes();
+
+        
segment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
+        segment.setLastBuildTime(System.currentTimeMillis());
+        segment.setSizeKB(cubeSizeBytes / 1024);
+        segment.setInputRecords(sourceCount);
+        segment.setInputRecordsSize(sourceSizeBytes);
+
+        try {
+            cubeManager.promoteNewlyOptimizeSegments(cube, segment);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } catch (IOException e) {
+            logger.error("fail to update cube after build", e);
+            return new ExecuteResult(e, e.getLocalizedMessage());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
new file mode 100644
index 0000000..0cd7264
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class UpdateOldCuboidShardJob extends AbstractHadoopJob {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(UpdateOldCuboidShardJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_OUTPUT_PATH);
+            parseOptions(options, args);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+            Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+            CubeManager cubeMgr = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeSegment optSegment = cube.getSegmentById(segmentID);
+            CubeSegment originalSegment = 
cube.getOriginalSegmentToOptimize(optSegment);
+
+            logger.info("Starting: " + job.getJobName());
+
+            setJobClasspath(job, cube.getConfig());
+
+            // Mapper
+            job.setMapperClass(UpdateOldCuboidShardMapper.class);
+
+            // Reducer
+            job.setNumReduceTasks(0);
+
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(Text.class);
+
+            // Input
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+            FileInputFormat.setInputPaths(job, input);
+            // Output
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            FileOutputFormat.setOutputPath(job, output);
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentID);
+            // add metadata to distributed cache
+            attachSegmentsMetadataWithDict(Lists.newArrayList(optSegment, 
originalSegment), job.getConfiguration());
+
+            this.deletePath(job.getConfiguration(), output);
+
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            logger.error("error in CuboidJob", e);
+            printUsage(options);
+            throw e;
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
new file mode 100644
index 0000000..cf3c29e
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateOldCuboidShardMapper extends KylinMapper<Text, Text, Text, 
Text> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(UpdateOldCuboidShardMapper.class);
+
+    private MultipleOutputs mos;
+    private long baseCuboid;
+
+    private CubeDesc cubeDesc;
+    private RowKeySplitter rowKeySplitter;
+    private RowKeyEncoderProvider rowKeyEncoderProvider;
+
+    private Text outputKey = new Text();
+    private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
+    private ByteArray newKeyBuf = 
ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        mos = new MultipleOutputs(context);
+
+        String cubeName = 
context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
+        String segmentID = 
context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
+
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        CubeSegment cubeSegment = cube.getSegmentById(segmentID);
+        CubeSegment oldSegment = 
cube.getOriginalSegmentToOptimize(cubeSegment);
+
+        cubeDesc = cube.getDescriptor();
+        baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
+
+        rowKeySplitter = new RowKeySplitter(oldSegment, 65, 256);
+        rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment);
+    }
+
+    @Override
+    public void doMap(Text key, Text value, Context context) throws 
IOException, InterruptedException {
+        long cuboidID = rowKeySplitter.split(key.getBytes());
+
+        Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
+        int fullKeySize = buildKey(cuboid, rowKeySplitter.getSplitBuffers());
+        outputKey.set(newKeyBuf.array(), 0, fullKeySize);
+
+        String baseOutputPath = PathNameCuboidOld;
+        if (cuboidID == baseCuboid) {
+            baseOutputPath = PathNameCuboidBase;
+        }
+        mos.write(outputKey, value, generateFileName(baseOutputPath));
+    }
+
+    private int buildKey(Cuboid cuboid, SplittedBytes[] splitBuffers) {
+        RowKeyEncoder rowkeyEncoder = 
rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
+
+        int startIdx = rowKeySplitter.getBodySplitOffset(); // skip shard and 
cuboidId
+        int endIdx = startIdx + Long.bitCount(cuboid.getId());
+        int offset = 0;
+        for (int i = startIdx; i < endIdx; i++) {
+            System.arraycopy(splitBuffers[i].value, 0, newKeyBodyBuf, offset, 
splitBuffers[i].length);
+            offset += splitBuffers[i].length;
+        }
+
+        int fullKeySize = rowkeyEncoder.getBytesLength();
+        while (newKeyBuf.array().length < fullKeySize) {
+            newKeyBuf.set(new byte[newKeyBuf.length() * 2]);
+        }
+        newKeyBuf.set(0, fullKeySize);
+
+        rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), 
newKeyBuf);
+
+        return fullKeySize;
+    }
+
+    @Override
+    public void doCleanup(Context context) throws IOException, 
InterruptedException {
+        mos.close();
+
+        Path outputDirBase = new 
Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), 
PathNameCuboidBase);
+        FileSystem fs = FileSystem.get(context.getConfiguration());
+        if (!fs.exists(outputDirBase)) {
+            fs.mkdirs(outputDirBase);
+            SequenceFile
+                    .createWriter(context.getConfiguration(),
+                            SequenceFile.Writer.file(new Path(outputDirBase, 
"part-m-00000")),
+                            SequenceFile.Writer.keyClass(Text.class), 
SequenceFile.Writer.valueClass(Text.class))
+                    .close();
+        }
+    }
+
+    private String generateFileName(String subDir) {
+        return subDir + "/part";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
index 08ed207..81d97b4 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.engine.mr.BatchOptimizeJobBuilder2;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 
@@ -46,6 +47,11 @@ public class SparkBatchCubingEngine implements 
IBatchCubingEngine {
     }
 
     @Override
+    public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment 
optimizeSegment, String submitter) {
+        return new BatchOptimizeJobBuilder2(optimizeSegment, 
submitter).build();
+    }
+
+    @Override
     public Class<?> getSourceInterface() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index a370292..8f821dd 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -39,6 +39,7 @@ import org.apache.kylin.dimension.DimensionEncodingFactory;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.project.ProjectInstance;
@@ -50,6 +51,7 @@ import org.apache.kylin.rest.exception.NotFoundException;
 import org.apache.kylin.rest.request.CubeRequest;
 import org.apache.kylin.rest.request.JobBuildRequest;
 import org.apache.kylin.rest.request.JobBuildRequest2;
+import org.apache.kylin.rest.request.JobOptimizeRequest;
 import org.apache.kylin.rest.response.GeneralResponse;
 import org.apache.kylin.rest.response.HBaseResponse;
 import org.apache.kylin.rest.service.CubeService;
@@ -307,6 +309,64 @@ public class CubeController extends BasicController {
         }
     }
 
+    /**
+     * Send a optimize cube job
+     *
+     * @param cubeName Cube ID
+     * @return JobInstance of CheckpointExecutable
+     */
+    @RequestMapping(value = "/{cubeName}/optimize", method = { 
RequestMethod.PUT })
+    @ResponseBody
+    public JobInstance optimize(@PathVariable String cubeName, @RequestBody 
JobOptimizeRequest jobOptimizeRequest) {
+        try {
+            String submitter = 
SecurityContextHolder.getContext().getAuthentication().getName();
+            CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
+
+            if (cube == null) {
+                throw new InternalErrorException("Cannot find cube " + 
cubeName);
+            }
+            logger.info("cuboid recommend:" + 
jobOptimizeRequest.getCuboidsRecommend());
+            return jobService.submitOptimizeJob(cube, 
jobOptimizeRequest.getCuboidsRecommend(), submitter).getFirst();
+        } catch (JobException e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new BadRequestException(e.getLocalizedMessage());
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new InternalErrorException(e.getLocalizedMessage());
+        }
+    }
+
+    /**
+     * Send a optimize cube segment job
+     *
+     * @param cubeName Cube ID
+     * @param segmentID for segment to be optimized
+     */
+    @RequestMapping(value = 
"/{cubeName}/recover_segment_optimize/{segmentID}", method = { 
RequestMethod.PUT })
+    @ResponseBody
+    public JobInstance recoverSegmentOptimize(@PathVariable String cubeName, 
@PathVariable String segmentID) {
+        try {
+            String submitter = 
SecurityContextHolder.getContext().getAuthentication().getName();
+            CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
+            if (cube == null) {
+                throw new InternalErrorException("Cannot find cube " + 
cubeName);
+            }
+
+            CubeSegment segment = cube.getSegmentById(segmentID);
+            if (segment == null) {
+                throw new InternalErrorException("Cannot find segment '" + 
segmentID + "'");
+            }
+
+            return jobService.submitRecoverSegmentOptimizeJob(segment, 
submitter);
+        } catch (JobException e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new BadRequestException(e.getLocalizedMessage());
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new InternalErrorException(e.getLocalizedMessage());
+        }
+    }
+
     @RequestMapping(value = "/{cubeName}/disable", method = { 
RequestMethod.PUT }, produces = { "application/json" })
     @ResponseBody
     public CubeInstance disableCube(@PathVariable String cubeName) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java
 
b/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java
new file mode 100644
index 0000000..51e8e7c
--- /dev/null
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.request;
+
+import java.util.Set;
+
+public class JobOptimizeRequest {
+
+    private Set<Long> cuboidsRecommend;
+
+    public Set<Long> getCuboidsRecommend() {
+        return cuboidsRecommend;
+    }
+
+    public void setCuboidsRecommend(Set<Long> cuboidsRecommend) {
+        this.cuboidsRecommend = cuboidsRecommend;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 4820ccd..93af53b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -70,6 +70,8 @@ import 
org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Component;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /**
  * Stateless & lightweight service facade of cube management functions.
@@ -509,6 +511,8 @@ public class CubeService extends BasicService {
 
         CubeUpdate update = new CubeUpdate(cube);
         update.setToRemoveSegs(cube.getSegments().toArray(new 
CubeSegment[cube.getSegments().size()]));
+        update.setCuboids(Maps.<Long, Long> newHashMap());
+        update.setCuboidsRecommend(Sets.<Long> newHashSet());
         CubeManager.getInstance(getConfig()).updateCube(update);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 432d300..a437934 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -34,11 +34,13 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.directory.api.util.Strings;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.BatchOptimizeJobCheckpointBuilder;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.common.JobInfoConverter;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -48,6 +50,7 @@ import org.apache.kylin.job.SchedulerFactory;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobTimeFilterEnum;
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.CheckpointExecutable;
@@ -71,6 +74,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.EnableAspectJAutoProxy;
 import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Component;
 
 import com.google.common.base.Function;
@@ -271,6 +275,142 @@ public class JobService extends BasicService implements 
InitializingBean {
         return jobInstance;
     }
 
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
+            + " or hasPermission(#cube, 'ADMINISTRATION') or 
hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
+    public Pair<JobInstance, List<JobInstance>> submitOptimizeJob(CubeInstance 
cube, Set<Long> cuboidsRecommend,
+            String submitter) throws IOException, JobException {
+
+        Pair<JobInstance, List<JobInstance>> result = 
submitOptimizeJobInternal(cube, cuboidsRecommend, submitter);
+        accessService.init(result.getFirst(), null);
+        accessService.inherit(result.getFirst(), cube);
+        for (JobInstance jobInstance : result.getSecond()) {
+            accessService.init(jobInstance, null);
+            accessService.inherit(jobInstance, cube);
+        }
+
+        return result;
+    }
+
+    private Pair<JobInstance, List<JobInstance>> 
submitOptimizeJobInternal(CubeInstance cube,
+            Set<Long> cuboidsRecommend, String submitter) throws IOException {
+        Message msg = MsgPicker.getMsg();
+
+        if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
+            throw new 
BadRequestException(String.format(msg.getBUILD_BROKEN_CUBE(), cube.getName()));
+        }
+
+        checkCubeDescSignature(cube);
+        checkAllowOptimization(cube, cuboidsRecommend);
+
+        CubeSegment[] optimizeSegments = null;
+        try {
+            /** Add optimize segments */
+            optimizeSegments = getCubeManager().optimizeSegments(cube, 
cuboidsRecommend);
+            List<JobInstance> optimizeJobInstances = Lists.newLinkedList();
+
+            /** Add optimize jobs */
+            List<AbstractExecutable> optimizeJobList = 
Lists.newArrayListWithExpectedSize(optimizeSegments.length);
+            for (CubeSegment optimizeSegment : optimizeSegments) {
+                DefaultChainedExecutable optimizeJob = 
EngineFactory.createBatchOptimizeJob(optimizeSegment, submitter);
+                getExecutableManager().addJob(optimizeJob);
+
+                optimizeJobList.add(optimizeJob);
+                optimizeJobInstances.add(getSingleJobInstance(optimizeJob));
+            }
+
+            /** Add checkpoint job for batch jobs */
+            CheckpointExecutable checkpointJob = new 
BatchOptimizeJobCheckpointBuilder(cube, submitter).build();
+            checkpointJob.addTaskListForCheck(optimizeJobList);
+
+            getExecutableManager().addJob(checkpointJob);
+
+            return new Pair(getCheckpointJobInstance(checkpointJob), 
optimizeJobInstances);
+        } catch (Exception e) {
+            if (optimizeSegments != null) {
+                logger.error("Job submission might failed for NEW segments {}, 
will clean the NEW segments from cube",
+                        optimizeSegments);
+                try {
+                    // Remove this segments
+                    CubeUpdate cubeBuilder = new CubeUpdate(cube);
+                    cubeBuilder.setToRemoveSegs(optimizeSegments);
+                    getCubeManager().updateCube(cubeBuilder);
+                } catch (Exception ee) {
+                    // swallow the exception
+                    logger.error("Clean New segments failed, ignoring it", e);
+                }
+            }
+            throw e;
+        }
+    }
+
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
+            + " or hasPermission(#cube, 'ADMINISTRATION') or 
hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
+    public JobInstance submitRecoverSegmentOptimizeJob(CubeSegment segment, 
String submitter)
+            throws IOException, JobException {
+        CubeInstance cubeInstance = segment.getCubeInstance();
+
+        checkCubeDescSignature(cubeInstance);
+
+        String cubeName = cubeInstance.getName();
+        List<JobInstance> jobInstanceList = searchJobsByCubeName(cubeName, 
null,
+                Lists.newArrayList(JobStatusEnum.NEW, JobStatusEnum.PENDING, 
JobStatusEnum.ERROR),
+                JobTimeFilterEnum.ALL, JobSearchMode.CHECKPOINT_ONLY);
+        if (jobInstanceList.size() > 1) {
+            throw new IllegalStateException("Exist more than one 
CheckpointExecutable for cube " + cubeName);
+        } else if (jobInstanceList.size() == 0) {
+            throw new IllegalStateException("There's no CheckpointExecutable 
for cube " + cubeName);
+        }
+        CheckpointExecutable checkpointExecutable = (CheckpointExecutable) 
getExecutableManager()
+                .getJob(jobInstanceList.get(0).getId());
+
+        AbstractExecutable toBeReplaced = null;
+        for (AbstractExecutable taskForCheck : 
checkpointExecutable.getSubTasksForCheck()) {
+            if (taskForCheck instanceof CubingJob) {
+                CubingJob subCubingJob = (CubingJob) taskForCheck;
+                String segmentName = 
CubingExecutableUtil.getSegmentName(subCubingJob.getParams());
+                if (segmentName != null && 
segmentName.equals(segment.getName())) {
+                    String segmentID = 
CubingExecutableUtil.getSegmentId(subCubingJob.getParams());
+                    CubeSegment beingOptimizedSegment = 
cubeInstance.getSegmentById(segmentID);
+                    if (beingOptimizedSegment != null) { // 
beingOptimizedSegment exists & should not be recovered
+                        throw new IllegalStateException("Segment " + 
beingOptimizedSegment.getName() + "-"
+                                + beingOptimizedSegment.getUuid()
+                                + " still exists. Please delete it or discard 
the related optimize job first!!!");
+                    }
+                    toBeReplaced = taskForCheck;
+                    break;
+                }
+            }
+        }
+        if (toBeReplaced == null) {
+            throw new IllegalStateException("There's no CubingJob for segment 
" + segment.getName()
+                    + " in CheckpointExecutable " + 
checkpointExecutable.getName());
+        }
+
+        /** Add CubingJob for the related segment **/
+        CubeSegment optimizeSegment = 
getCubeManager().appendSegment(cubeInstance, segment.getDateRangeStart(),
+                segment.getDateRangeEnd());
+        CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
+        cubeBuilder.setToAddSegs(optimizeSegment);
+        getCubeManager().updateCube(cubeBuilder);
+
+        DefaultChainedExecutable optimizeJob = 
EngineFactory.createBatchOptimizeJob(optimizeSegment, submitter);
+
+        getExecutableManager().addJob(optimizeJob);
+
+        JobInstance optimizeJobInstance = getSingleJobInstance(optimizeJob);
+        accessService.init(optimizeJobInstance, null);
+        accessService.inherit(optimizeJobInstance, cubeInstance);
+
+        /** Update the checkpoint job */
+        
checkpointExecutable.getSubTasksForCheck().set(checkpointExecutable.getSubTasksForCheck().indexOf(toBeReplaced),
+                optimizeJob);
+
+        
getExecutableManager().updateCheckpointJob(checkpointExecutable.getId(),
+                checkpointExecutable.getSubTasksForCheck());
+
+        return optimizeJobInstance;
+    }
+
     private void checkCubeDescSignature(CubeInstance cube) {
         Message msg = MsgPicker.getMsg();
 
@@ -279,8 +419,25 @@ public class JobService extends BasicService implements 
InitializingBean {
                     String.format(msg.getINCONSISTENT_CUBE_DESC_SIGNATURE(), 
cube.getDescriptor()));
     }
 
+    private void checkAllowOptimization(CubeInstance cube, Set<Long> 
cuboidsRecommend) {
+        long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
+        if (!cuboidsRecommend.contains(baseCuboid)) {
+            throw new BadRequestException("The recommend cuboids should 
contain the base cuboid " + baseCuboid);
+        }
+        Set<Long> currentCuboidSet = 
cube.getCuboidScheduler().getAllCuboidIds();
+        if (currentCuboidSet.equals(cuboidsRecommend)) {
+            throw new BadRequestException(
+                    "The recommend cuboids are the same as the current 
cuboids. It's no need to do optimization.");
+        }
+    }
+
     public JobInstance getJobInstance(String uuid) {
-        return getSingleJobInstance(getExecutableManager().getJob(uuid));
+        AbstractExecutable job = getExecutableManager().getJob(uuid);
+        if (job instanceof CheckpointExecutable) {
+            return getCheckpointJobInstance(job);
+        } else {
+            return getSingleJobInstance(job);
+        }
     }
 
     public Output getOutput(String id) {
@@ -362,21 +519,88 @@ public class JobService extends BasicService implements 
InitializingBean {
             getExecutableManager().discardJob(job.getId());
             return job;
         }
-        CubeInstance cubeInstance = 
getCubeManager().getCube(job.getRelatedCube());
+
+        logger.info("Cancel job [" + job.getId() + "] trigger by "
+                + 
SecurityContextHolder.getContext().getAuthentication().getName());
+        if (job.getStatus() == JobStatusEnum.FINISHED) {
+            throw new IllegalStateException(
+                    "The job " + job.getId() + " has already been finished and 
cannot be discarded.");
+        }
+        if (job.getStatus() == JobStatusEnum.DISCARDED) {
+            return job;
+        }
+
+        AbstractExecutable executable = 
getExecutableManager().getJob(job.getId());
+        if (executable instanceof CubingJob) {
+            cancelCubingJobInner((CubingJob) executable);
+        } else if (executable instanceof CheckpointExecutable) {
+            cancelCheckpointJobInner((CheckpointExecutable) executable);
+        } else {
+            getExecutableManager().discardJob(executable.getId());
+        }
+        return job;
+    }
+
+    private void cancelCubingJobInner(CubingJob cubingJob) throws IOException {
+        CubeInstance cubeInstance = 
getCubeManager().getCube(CubingExecutableUtil.getCubeName(cubingJob.getParams()));
         // might not a cube job
-        final String segmentIds = job.getRelatedSegment();
-        for (String segmentId : StringUtils.split(segmentIds)) {
-            final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
-            if (segment != null && (segment.getStatus() == 
SegmentStatusEnum.NEW || segment.getDateRangeEnd() == 0)) {
-                // Remove this segments
+        final String segmentIds = 
CubingExecutableUtil.getSegmentId(cubingJob.getParams());
+        if (!StringUtils.isEmpty(segmentIds)) {
+            List<CubeSegment> toRemoveSegments = Lists.newLinkedList();
+            for (String segmentId : StringUtils.split(segmentIds)) {
+                final CubeSegment segment = 
cubeInstance.getSegmentById(segmentId);
+                if (segment != null && segment.getStatus() != 
SegmentStatusEnum.READY) {
+                    toRemoveSegments.add(segment);
+                }
+            }
+            if (!toRemoveSegments.isEmpty()) {
                 CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
-                cubeBuilder.setToRemoveSegs(segment);
+                cubeBuilder.setToRemoveSegs(toRemoveSegments.toArray(new 
CubeSegment[toRemoveSegments.size()]));
                 getCubeManager().updateCube(cubeBuilder);
             }
         }
-        getExecutableManager().discardJob(job.getId());
+        getExecutableManager().discardJob(cubingJob.getId());
+    }
 
-        return job;
+    private void cancelCheckpointJobInner(CheckpointExecutable 
checkpointExecutable) throws IOException {
+        List<String> segmentIdList = Lists.newLinkedList();
+        List<String> jobIdList = Lists.newLinkedList();
+        jobIdList.add(checkpointExecutable.getId());
+        setRelatedIdList(checkpointExecutable, segmentIdList, jobIdList);
+
+        CubeInstance cubeInstance = getCubeManager()
+                
.getCube(CubingExecutableUtil.getCubeName(checkpointExecutable.getParams()));
+        if (!segmentIdList.isEmpty()) {
+            List<CubeSegment> toRemoveSegments = Lists.newLinkedList();
+            for (String segmentId : segmentIdList) {
+                final CubeSegment segment = 
cubeInstance.getSegmentById(segmentId);
+                if (segment != null && segment.getStatus() != 
SegmentStatusEnum.READY) {
+                    toRemoveSegments.add(segment);
+                }
+            }
+
+            CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
+            cubeBuilder.setToRemoveSegs(toRemoveSegments.toArray(new 
CubeSegment[toRemoveSegments.size()]));
+            cubeBuilder.setCuboidsRecommend(Sets.<Long> newHashSet()); //Set 
recommend cuboids to be null
+            getCubeManager().updateCube(cubeBuilder);
+        }
+
+        for (String jobId : jobIdList) {
+            getExecutableManager().discardJob(jobId);
+        }
+    }
+
+    private void setRelatedIdList(CheckpointExecutable checkpointExecutable, 
List<String> segmentIdList,
+            List<String> jobIdList) {
+        for (AbstractExecutable taskForCheck : 
checkpointExecutable.getSubTasksForCheck()) {
+            jobIdList.add(taskForCheck.getId());
+            if (taskForCheck instanceof CubingJob) {
+                segmentIdList.addAll(Lists
+                        
.newArrayList(StringUtils.split(CubingExecutableUtil.getSegmentId(taskForCheck.getParams()))));
+            } else if (taskForCheck instanceof CheckpointExecutable) {
+                setRelatedIdList((CheckpointExecutable) taskForCheck, 
segmentIdList, jobIdList);
+            }
+        }
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN

http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index fc52701..f2e2ddd 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
@@ -66,6 +67,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
     CubeInstance cube = null;
     CubeDesc cubeDesc = null;
     String segmentID = null;
+    String cuboidModeName = null;
     KylinConfig kylinConfig;
     Path partitionFilePath;
 
@@ -77,6 +79,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_PARTITION_FILE_PATH);
         options.addOption(OPTION_STATISTICS_ENABLED);
+        options.addOption(OPTION_CUBOID_MODE);
         parseOptions(options, args);
 
         partitionFilePath = new 
Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
@@ -88,13 +91,27 @@ public class CreateHTableJob extends AbstractHadoopJob {
         cubeDesc = cube.getDescriptor();
         kylinConfig = cube.getConfig();
         segmentID = getOptionValue(OPTION_SEGMENT_ID);
+        cuboidModeName = getOptionValue(OPTION_CUBOID_MODE);
         CubeSegment cubeSegment = cube.getSegmentById(segmentID);
 
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
 
         byte[][] splitKeys;
         if (statsEnabled) {
-            final Map<Long, Double> cuboidSizeMap = new 
CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap();
+            Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, 
null, kylinConfig).getCuboidSizeMap();
+            Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName);
+            if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
+                Map<Long, Double> optimizedCuboidSizeMap = 
Maps.newHashMapWithExpectedSize(buildingCuboids.size());
+                for (Long cuboid : buildingCuboids) {
+                    Double cuboidSize = cuboidSizeMap.get(cuboid);
+                    if (cuboidSize == null) {
+                        logger.warn(cuboid + "cuboid's size is null will 
replace by 0");
+                        cuboidSize = 0.0;
+                    }
+                    optimizedCuboidSizeMap.put(cuboid, cuboidSize);
+                }
+                cuboidSizeMap = optimizedCuboidSizeMap;
+            }
             splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, 
kylinConfig, cubeSegment, partitionFilePath.getParent());
         } else {
             splitKeys = getRegionSplits(conf, partitionFilePath);

http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 31cb189..e1cf7e0 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -31,13 +31,15 @@ import 
org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.MapReduceUtil;
 import org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper;
 import org.apache.kylin.engine.mr.steps.InMemCuboidMapper;
 import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
 import org.apache.kylin.engine.mr.steps.NDCuboidMapper;
-import org.apache.kylin.engine.mr.steps.ReducerNumSizing;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,13 +95,15 @@ public class HBaseMROutput2Transition implements IMROutput2 
{
         }
 
         @Override
-        public void configureJobOutput(Job job, String output, CubeSegment 
segment, int level) throws Exception {
+        public void configureJobOutput(Job job, String output, CubeSegment 
segment, CuboidScheduler cuboidScheduler,
+                int level) throws Exception {
             int reducerNum = 1;
             Class mapperClass = job.getMapperClass();
             if (mapperClass == HiveToBaseCuboidMapper.class || mapperClass == 
NDCuboidMapper.class) {
-                reducerNum = 
ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, 
AbstractHadoopJob.getTotalMapInputMB(job), level);
+                reducerNum = 
MapReduceUtil.getLayeredCubingReduceTaskNum(segment, cuboidScheduler,
+                        AbstractHadoopJob.getTotalMapInputMB(job), level);
             } else if (mapperClass == InMemCuboidMapper.class) {
-                reducerNum = 
ReducerNumSizing.getInmemCubingReduceTaskNum(segment);
+                reducerNum = 
MapReduceUtil.getInmemCubingReduceTaskNum(segment, cuboidScheduler);
             }
             Path outputPath = new Path(output);
             FileOutputFormat.setOutputPath(job, outputPath);
@@ -149,7 +153,8 @@ public class HBaseMROutput2Transition implements IMROutput2 
{
 
         @Override
         public void configureJobOutput(Job job, String output, CubeSegment 
segment) throws Exception {
-            int reducerNum = 
ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, 
AbstractHadoopJob.getTotalMapInputMB(job), -1);
+            int reducerNum = 
MapReduceUtil.getLayeredCubingReduceTaskNum(segment, 
segment.getCuboidScheduler(),
+                    AbstractHadoopJob.getTotalMapInputMB(job), -1);
             job.setNumReduceTasks(reducerNum);
 
             Path outputPath = new Path(output);
@@ -185,4 +190,26 @@ public class HBaseMROutput2Transition implements 
IMROutput2 {
             throw new IllegalStateException("No merging segment's last build 
job ID equals " + jobID);
         }
     }
+
+    public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(final 
CubeSegment seg) {
+        return new IMRBatchOptimizeOutputSide2() {
+            HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+            @Override
+            public void addStepPhase1_CreateHTable(DefaultChainedExecutable 
jobFlow) {
+                
jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId(), 
CuboidModeEnum.RECOMMEND));
+            }
+
+            @Override
+            public void addStepPhase2_BuildCube(DefaultChainedExecutable 
jobFlow) {
+                
jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
+                jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase3_Cleanup(DefaultChainedExecutable 
jobFlow) {
+                steps.addOptimizeGarbageCollectionSteps(jobFlow);
+            }
+        };
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 6f69e8c..41e80e3 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -32,6 +33,7 @@ import 
org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 
 import com.google.common.base.Preconditions;
@@ -82,7 +84,15 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return createCreateHTableStep(jobId, true);
     }
 
+    public HadoopShellExecutable createCreateHTableStepWithStats(String jobId, 
CuboidModeEnum cuboidMode) {
+        return createCreateHTableStep(jobId, true, cuboidMode);
+    }
+
     private HadoopShellExecutable createCreateHTableStep(String jobId, boolean 
withStats) {
+        return createCreateHTableStep(jobId, withStats, 
CuboidModeEnum.CURRENT);
+    }
+
+    private HadoopShellExecutable createCreateHTableStep(String jobId, boolean 
withStats, CuboidModeEnum cuboidMode) {
         HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
         
createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
         StringBuilder cmd = new StringBuilder();
@@ -90,6 +100,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
         appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, 
seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, 
getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
         appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, 
String.valueOf(withStats));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, 
cuboidMode.toString());
 
         createHtableStep.setJobParams(cmd.toString());
         createHtableStep.setJobClass(CreateHTableJob.class);
@@ -167,6 +178,35 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return result;
     }
 
+    public MergeGCStep createOptimizeGCStep() {
+        MergeGCStep result = new MergeGCStep();
+        result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+        result.setOldHTables(getOptimizeHTables());
+        return result;
+    }
+
+    public List<CubeSegment> getOptimizeSegments() {
+        CubeInstance cube = (CubeInstance) seg.getRealization();
+        List<CubeSegment> newSegments = 
Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY_PENDING));
+        List<CubeSegment> oldSegments = 
Lists.newArrayListWithExpectedSize(newSegments.size());
+        for (CubeSegment segment : newSegments) {
+            oldSegments.add(cube.getOriginalSegmentToOptimize(segment));
+        }
+        return oldSegments;
+    }
+
+    public List<String> getOptimizeHTables() {
+        return getOldHTables(getOptimizeSegments());
+    }
+
+    public List<String> getOldHTables(final List<CubeSegment> oldSegments) {
+        final List<String> oldHTables = 
Lists.newArrayListWithExpectedSize(oldSegments.size());
+        for (CubeSegment segment : oldSegments) {
+            oldHTables.add(segment.getStorageLocationIdentifier());
+        }
+        return oldHTables;
+    }
+
     public List<String> getMergingHTables() {
         final List<CubeSegment> mergingSegments = ((CubeInstance) 
seg.getRealization()).getMergingSegments((CubeSegment) seg);
         Preconditions.checkState(mergingSegments.size() > 1, "there should be 
more than 2 segments to merge, target segment " + seg);
@@ -187,6 +227,18 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return mergingHDFSPaths;
     }
 
+    public List<String> getOptimizeHDFSPaths() {
+        return getOldHDFSPaths(getOptimizeSegments());
+    }
+
+    public List<String> getOldHDFSPaths(final List<CubeSegment> oldSegments) {
+        final List<String> oldHDFSPaths = 
Lists.newArrayListWithExpectedSize(oldSegments.size());
+        for (CubeSegment oldSegment : oldSegments) {
+            oldHDFSPaths.add(getJobWorkingDir(oldSegment.getLastBuildJobID()));
+        }
+        return oldHDFSPaths;
+    }
+
     public String getHFilePath(String jobId) {
         return 
HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + 
seg.getRealization().getName() + "/hfile/");
     }
@@ -195,6 +247,22 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return 
HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + 
seg.getRealization().getName() + "/rowkey_stats");
     }
 
+    public void addOptimizeGarbageCollectionSteps(DefaultChainedExecutable 
jobFlow) {
+        String jobId = jobFlow.getId();
+
+        jobFlow.addTask(createOptimizeGCStep());
+
+        List<String> toDeletePaths = new ArrayList<>();
+        toDeletePaths.addAll(getOptimizeHDFSPaths());
+
+        HDFSPathGarbageCollectionStep step = new 
HDFSPathGarbageCollectionStep();
+        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
+        step.setDeletePaths(toDeletePaths);
+        step.setJobId(jobId);
+
+        jobFlow.addTask(step);
+    }
+
     public void addMergingGarbageCollectionSteps(DefaultChainedExecutable 
jobFlow) {
         String jobId = jobFlow.getId();
 

Reply via email to