APACHE-KYLIN-2731: Introduce checkpoint executable

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c0a785d4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c0a785d4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c0a785d4

Branch: refs/heads/yaho-cube-planner
Commit: c0a785d467ea8254705b20c4433e9f60933114a1
Parents: abf4a97
Author: Zhong <nju_y...@apache.org>
Authored: Fri Aug 25 11:27:25 2017 +0800
Committer: Zhong <nju_y...@apache.org>
Committed: Fri Aug 25 11:27:25 2017 +0800

----------------------------------------------------------------------
 .../kylin/cube/model/CubeBuildTypeEnum.java     |   7 +-
 .../kylin/job/execution/AbstractExecutable.java |   5 +
 .../job/execution/CheckpointExecutable.java     |  60 +++++++
 .../engine/mr/common/JobInfoConverter.java      |  41 ++++-
 .../apache/kylin/rest/service/JobService.java   | 166 ++++++++++++++++++-
 5 files changed, 267 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c0a785d4/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
index e3ae214..6a14025 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
@@ -35,5 +35,10 @@ public enum CubeBuildTypeEnum {
     /**
      * refresh segments
      */
-    REFRESH
+    REFRESH,
+
+    /**
+     * checkpoint for set of other jobs
+     */
+    CHECKPOINT
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c0a785d4/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index d36f598..30b6421 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -384,6 +384,11 @@ public abstract class AbstractExecutable implements 
Executable, Idempotent {
         return getDuration(getStartTime(), getEndTime(), getInterruptTime());
     }
 
+    public boolean isReady() {
+        final Output output = getManager().getOutput(id);
+        return output.getState() == ExecutableState.READY;
+    }
+
     /*
     * discarded is triggered by JobService, the Scheduler is not awake of that
     *

http://git-wip-us.apache.org/repos/asf/kylin/blob/c0a785d4/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
 
b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
new file mode 100644
index 0000000..604f216
--- /dev/null
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
@@ -0,0 +1,60 @@
+package org.apache.kylin.job.execution;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class CheckpointExecutable extends DefaultChainedExecutable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(CheckpointExecutable.class);
+
+    private static final String DEPLOY_ENV_NAME = "envName";
+    private static final String PROJECT_INSTANCE_NAME = "projectName";
+
+    private final List<AbstractExecutable> subTasksForCheck = 
Lists.newArrayList();
+
+    public void addTaskForCheck(AbstractExecutable executable) {
+        this.subTasksForCheck.add(executable);
+    }
+
+    public void addTaskListForCheck(List<AbstractExecutable> executableList) {
+        this.subTasksForCheck.addAll(executableList);
+    }
+
+    public List<AbstractExecutable> getSubTasksForCheck() {
+        return subTasksForCheck;
+    }
+
+    @Override
+    public boolean isReady() {
+        if (!super.isReady()) {
+            return false;
+        }
+        for (Executable task : subTasksForCheck) {
+            final Output output = getManager().getOutput(task.getId());
+            if (output.getState() != ExecutableState.SUCCEED) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public String getDeployEnvName() {
+        return getParam(DEPLOY_ENV_NAME);
+    }
+
+    public void setDeployEnvName(String name) {
+        setParam(DEPLOY_ENV_NAME, name);
+    }
+
+    public String getProjectName() {
+        return getParam(PROJECT_INSTANCE_NAME);
+    }
+
+    public void setProjectName(String name) {
+        setParam(PROJECT_INSTANCE_NAME, name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c0a785d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
index c465e3f..20c5d39 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
@@ -28,26 +28,25 @@ import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.CheckpointExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
 
 import com.google.common.base.Preconditions;
 
 public class JobInfoConverter {
-    public static JobInstance parseToJobInstance(AbstractExecutable job, 
Map<String, Output> outputs) {
+    public static JobInstance parseToJobInstance(CubingJob job, Map<String, 
Output> outputs) {
         if (job == null) {
             return null;
         }
-        Preconditions.checkState(job instanceof CubingJob, "illegal job type, 
id:" + job.getId());
-        CubingJob cubeJob = (CubingJob) job;
         Output output = outputs.get(job.getId());
         final JobInstance result = new JobInstance();
         result.setName(job.getName());
-        
result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams()));
-        
result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams()));
+        
result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams()));
+        
result.setRelatedSegment(CubingExecutableUtil.getSegmentId(job.getParams()));
         result.setLastModified(output.getLastModified());
-        result.setSubmitter(cubeJob.getSubmitter());
-        result.setUuid(cubeJob.getId());
+        result.setSubmitter(job.getSubmitter());
+        result.setUuid(job.getId());
         result.setType(CubeBuildTypeEnum.BUILD);
         result.setStatus(parseToJobStatus(output.getState()));
         result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, 
CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000);
@@ -55,8 +54,32 @@ public class JobInfoConverter {
         result.setExecEndTime(AbstractExecutable.getEndTime(output));
         
result.setExecInterruptTime(AbstractExecutable.getInterruptTime(output));
         
result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), 
result.getExecEndTime(), result.getExecInterruptTime()) / 1000);
-        for (int i = 0; i < cubeJob.getTasks().size(); ++i) {
-            AbstractExecutable task = cubeJob.getTasks().get(i);
+        for (int i = 0; i < job.getTasks().size(); ++i) {
+            AbstractExecutable task = job.getTasks().get(i);
+            result.addStep(parseToJobStep(task, i, outputs.get(task.getId())));
+        }
+        return result;
+    }
+
+    public static JobInstance parseToJobInstance(CheckpointExecutable job, 
Map<String, Output> outputs) {
+        if (job == null) {
+            return null;
+        }
+        Output output = outputs.get(job.getId());
+        final JobInstance result = new JobInstance();
+        result.setName(job.getName());
+        
result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams()));
+        result.setLastModified(output.getLastModified());
+        result.setSubmitter(job.getSubmitter());
+        result.setUuid(job.getId());
+        result.setType(CubeBuildTypeEnum.CHECKPOINT);
+        result.setStatus(parseToJobStatus(output.getState()));
+        result.setExecStartTime(AbstractExecutable.getStartTime(output));
+        result.setExecEndTime(AbstractExecutable.getEndTime(output));
+        
result.setExecInterruptTime(AbstractExecutable.getInterruptTime(output));
+        
result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), 
result.getExecEndTime(), result.getExecInterruptTime()) / 1000);
+        for (int i = 0; i < job.getTasks().size(); ++i) {
+            AbstractExecutable task = job.getTasks().get(i);
             result.addStep(parseToJobStep(task, i, outputs.get(task.getId())));
         }
         return result;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c0a785d4/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 5bf684a..432d300 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -50,6 +50,7 @@ import org.apache.kylin.job.constant.JobTimeFilterEnum;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.CheckpointExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
@@ -315,6 +316,33 @@ public class JobService extends BasicService implements 
InitializingBean {
         return result;
     }
 
+    protected JobInstance getCheckpointJobInstance(AbstractExecutable job) {
+        Message msg = MsgPicker.getMsg();
+
+        if (job == null) {
+            return null;
+        }
+        if (!(job instanceof CheckpointExecutable)) {
+            throw new 
BadRequestException(String.format(msg.getILLEGAL_JOB_TYPE(), job.getId()));
+        }
+
+        CheckpointExecutable checkpointExecutable = (CheckpointExecutable) job;
+        final JobInstance result = new JobInstance();
+        result.setName(job.getName());
+        
result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams()));
+        result.setLastModified(job.getLastModified());
+        result.setSubmitter(job.getSubmitter());
+        result.setUuid(job.getId());
+        result.setType(CubeBuildTypeEnum.CHECKPOINT);
+        result.setStatus(JobInfoConverter.parseToJobStatus(job.getStatus()));
+        result.setDuration(job.getDuration() / 1000);
+        for (int i = 0; i < checkpointExecutable.getTasks().size(); ++i) {
+            AbstractExecutable task = checkpointExecutable.getTasks().get(i);
+            result.addStep(JobInfoConverter.parseToJobStep(task, i, 
getExecutableManager().getOutput(task.getId())));
+        }
+        return result;
+    }
+
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
             + " or hasPermission(#job, 'ADMINISTRATION') or 
hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
     public void resumeJob(JobInstance job) {
@@ -374,6 +402,7 @@ public class JobService extends BasicService implements 
InitializingBean {
         Integer limit = (null == limitValue) ? 30 : limitValue;
         Integer offset = (null == offsetValue) ? 0 : offsetValue;
         List<JobInstance> jobs = searchJobsByCubeName(cubeNameSubstring, 
projectName, statusList, timeFilter);
+
         Collections.sort(jobs);
 
         if (jobs.size() <= offset) {
@@ -389,12 +418,40 @@ public class JobService extends BasicService implements 
InitializingBean {
 
     public List<JobInstance> searchJobsByCubeName(final String 
cubeNameSubstring, final String projectName,
             final List<JobStatusEnum> statusList, final JobTimeFilterEnum 
timeFilter) {
-        return innerSearchCubingJobs(cubeNameSubstring, null, projectName, 
statusList, timeFilter);
+        return searchJobsByCubeName(cubeNameSubstring, projectName, 
statusList, timeFilter, JobSearchMode.ALL);
+    }
+
+    public List<JobInstance> searchJobsByCubeName(final String 
cubeNameSubstring, final String projectName,
+            final List<JobStatusEnum> statusList, final JobTimeFilterEnum 
timeFilter, JobSearchMode jobSearchMode) {
+        return innerSearchJobs(cubeNameSubstring, null, projectName, 
statusList, timeFilter, jobSearchMode);
     }
 
     public List<JobInstance> searchJobsByJobName(final String jobName, final 
String projectName,
             final List<JobStatusEnum> statusList, final JobTimeFilterEnum 
timeFilter) {
-        return innerSearchCubingJobs(null, jobName, projectName, statusList, 
timeFilter);
+        return searchJobsByJobName(jobName, projectName, statusList, 
timeFilter, JobSearchMode.ALL);
+    }
+
+    public List<JobInstance> searchJobsByJobName(final String jobName, final 
String projectName,
+            final List<JobStatusEnum> statusList, final JobTimeFilterEnum 
timeFilter, JobSearchMode jobSearchMode) {
+        return innerSearchJobs(null, jobName, projectName, statusList, 
timeFilter, jobSearchMode);
+    }
+
+    public List<JobInstance> innerSearchJobs(final String cubeName, final 
String jobName, final String projectName,
+            final List<JobStatusEnum> statusList, final JobTimeFilterEnum 
timeFilter, JobSearchMode jobSearchMode) {
+        List<JobInstance> result = Lists.newArrayList();
+        switch (jobSearchMode) {
+        case CUBING_ONLY:
+            result.addAll(innerSearchCubingJobs(cubeName, jobName, 
projectName, statusList, timeFilter));
+            break;
+        case CHECKPOINT_ONLY:
+            result.addAll(innerSearchCheckpointJobs(cubeName, jobName, 
projectName, statusList, timeFilter));
+            break;
+        case ALL:
+        default:
+            result.addAll(innerSearchCubingJobs(cubeName, jobName, 
projectName, statusList, timeFilter));
+            result.addAll(innerSearchCheckpointJobs(cubeName, jobName, 
projectName, statusList, timeFilter));
+        }
+        return result;
     }
 
     public List<JobInstance> innerSearchCubingJobs(final String cubeName, 
final String jobName,
@@ -494,6 +551,108 @@ public class JobService extends BasicService implements 
InitializingBean {
         return results;
     }
 
+    public List<JobInstance> innerSearchCheckpointJobs(final String cubeName, 
final String jobName,
+            final String projectName, final List<JobStatusEnum> statusList, 
final JobTimeFilterEnum timeFilter) {
+        // prepare time range
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(new Date());
+        long timeStartInMillis = getTimeStartInMillis(calendar, timeFilter);
+        long timeEndInMillis = Long.MAX_VALUE;
+        Set<ExecutableState> states = convertStatusEnumToStates(statusList);
+        final Map<String, Output> allOutputs = 
getExecutableManager().getAllOutputs(timeStartInMillis, timeEndInMillis);
+
+        return Lists
+                .newArrayList(FluentIterable
+                        .from(innerSearchCheckpointJobs(cubeName, jobName, 
states, timeStartInMillis, timeEndInMillis,
+                                allOutputs, false, projectName))
+                        .transform(new Function<CheckpointExecutable, 
JobInstance>() {
+                            @Override
+                            public JobInstance apply(CheckpointExecutable 
checkpointExecutable) {
+                                return 
JobInfoConverter.parseToJobInstance(checkpointExecutable, allOutputs);
+                            }
+                        }));
+    }
+
+    public List<CheckpointExecutable> innerSearchCheckpointJobs(final String 
cubeName, final String jobName,
+            final Set<ExecutableState> statusList, long timeStartInMillis, 
long timeEndInMillis,
+            final Map<String, Output> allOutputs, final boolean 
nameExactMatch, final String projectName) {
+        List<CheckpointExecutable> results = Lists
+                .newArrayList(
+                        FluentIterable
+                                
.from(getExecutableManager().getAllAbstractExecutables(timeStartInMillis,
+                                        timeEndInMillis, 
CheckpointExecutable.class))
+                                .filter(new Predicate<AbstractExecutable>() {
+                                    @Override
+                                    public boolean apply(AbstractExecutable 
executable) {
+                                        if (executable instanceof 
CheckpointExecutable) {
+                                            if (StringUtils.isEmpty(cubeName)) 
{
+                                                return true;
+                                            }
+                                            String executableCubeName = 
CubingExecutableUtil
+                                                    
.getCubeName(executable.getParams());
+                                            if (executableCubeName == null)
+                                                return true;
+                                            if (nameExactMatch)
+                                                return 
executableCubeName.toLowerCase().equals(cubeName);
+                                            else
+                                                return 
executableCubeName.toLowerCase()
+                                                        
.contains(cubeName.toLowerCase());
+                                        } else {
+                                            return false;
+                                        }
+                                    }
+                                }).transform(new Function<AbstractExecutable, 
CheckpointExecutable>() {
+                                    @Override
+                                    public CheckpointExecutable 
apply(AbstractExecutable executable) {
+                                        return (CheckpointExecutable) 
executable;
+                                    }
+                                }).filter(Predicates.and(new 
Predicate<CheckpointExecutable>() {
+                                    @Override
+                                    public boolean apply(CheckpointExecutable 
executable) {
+                                        if (null == projectName
+                                                || null == 
getProjectManager().getProject(projectName)) {
+                                            return true;
+                                        } else {
+                                            return 
projectName.equals(executable.getProjectName());
+                                        }
+                                    }
+                                }, new Predicate<CheckpointExecutable>() {
+                                    @Override
+                                    public boolean apply(CheckpointExecutable 
executable) {
+                                        try {
+                                            Output output = 
allOutputs.get(executable.getId());
+                                            if (output == null) {
+                                                return false;
+                                            }
+
+                                            ExecutableState state = 
output.getState();
+                                            boolean ret = 
statusList.contains(state);
+                                            return ret;
+                                        } catch (Exception e) {
+                                            throw e;
+                                        }
+                                    }
+                                }, new Predicate<CheckpointExecutable>() {
+                                    @Override
+                                    public boolean apply(@Nullable 
CheckpointExecutable checkpointExecutable) {
+                                        if (checkpointExecutable == null) {
+                                            return false;
+                                        }
+
+                                        if (Strings.isEmpty(jobName)) {
+                                            return true;
+                                        }
+
+                                        if (nameExactMatch) {
+                                            return 
checkpointExecutable.getName().toLowerCase().equals(jobName);
+                                        } else {
+                                            return 
checkpointExecutable.getName().toLowerCase().contains(jobName);
+                                        }
+                                    }
+                                })));
+        return results;
+    }
+
     public List<CubingJob> listJobsByRealizationName(final String 
realizationName, final String projectName,
             final Set<ExecutableState> statusList) {
         return innerSearchCubingJobs(realizationName, null, statusList, 0L, 
Long.MAX_VALUE,
@@ -504,4 +663,7 @@ public class JobService extends BasicService implements 
InitializingBean {
         return listJobsByRealizationName(realizationName, projectName, 
EnumSet.allOf(ExecutableState.class));
     }
 
+    public enum JobSearchMode {
+        CUBING_ONLY, CHECKPOINT_ONLY, ALL
+    }
 }

Reply via email to