APACHE-KYLIN-2731: Introduce checkpoint executable
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c0a785d4 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c0a785d4 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c0a785d4 Branch: refs/heads/yaho-cube-planner Commit: c0a785d467ea8254705b20c4433e9f60933114a1 Parents: abf4a97 Author: Zhong <nju_y...@apache.org> Authored: Fri Aug 25 11:27:25 2017 +0800 Committer: Zhong <nju_y...@apache.org> Committed: Fri Aug 25 11:27:25 2017 +0800 ---------------------------------------------------------------------- .../kylin/cube/model/CubeBuildTypeEnum.java | 7 +- .../kylin/job/execution/AbstractExecutable.java | 5 + .../job/execution/CheckpointExecutable.java | 60 +++++++ .../engine/mr/common/JobInfoConverter.java | 41 ++++- .../apache/kylin/rest/service/JobService.java | 166 ++++++++++++++++++- 5 files changed, 267 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/c0a785d4/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java index e3ae214..6a14025 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java @@ -35,5 +35,10 @@ public enum CubeBuildTypeEnum { /** * refresh segments */ - REFRESH + REFRESH, + + /** + * checkpoint for set of other jobs + */ + CHECKPOINT } http://git-wip-us.apache.org/repos/asf/kylin/blob/c0a785d4/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index d36f598..30b6421 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -384,6 +384,11 @@ public abstract class AbstractExecutable implements Executable, Idempotent { return getDuration(getStartTime(), getEndTime(), getInterruptTime()); } + public boolean isReady() { + final Output output = getManager().getOutput(id); + return output.getState() == ExecutableState.READY; + } + /* * discarded is triggered by JobService, the Scheduler is not awake of that * http://git-wip-us.apache.org/repos/asf/kylin/blob/c0a785d4/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java new file mode 100644 index 0000000..604f216 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java @@ -0,0 +1,60 @@ +package org.apache.kylin.job.execution; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class CheckpointExecutable extends DefaultChainedExecutable { + + private static final Logger logger = LoggerFactory.getLogger(CheckpointExecutable.class); + + private static final String DEPLOY_ENV_NAME = "envName"; + private static final String PROJECT_INSTANCE_NAME = "projectName"; + + private final List<AbstractExecutable> subTasksForCheck = Lists.newArrayList(); + + public void addTaskForCheck(AbstractExecutable executable) { + this.subTasksForCheck.add(executable); + } + + public void addTaskListForCheck(List<AbstractExecutable> executableList) { + this.subTasksForCheck.addAll(executableList); + } + + public List<AbstractExecutable> getSubTasksForCheck() { + return subTasksForCheck; + } + + @Override + public boolean isReady() { + if (!super.isReady()) { + return false; + } + for (Executable task : subTasksForCheck) { + final Output output = getManager().getOutput(task.getId()); + if (output.getState() != ExecutableState.SUCCEED) { + return false; + } + } + return true; + } + + public String getDeployEnvName() { + return getParam(DEPLOY_ENV_NAME); + } + + public void setDeployEnvName(String name) { + setParam(DEPLOY_ENV_NAME, name); + } + + public String getProjectName() { + return getParam(PROJECT_INSTANCE_NAME); + } + + public void setProjectName(String name) { + setParam(PROJECT_INSTANCE_NAME, name); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c0a785d4/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java index c465e3f..20c5d39 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java @@ -28,26 +28,25 @@ import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.constant.JobStepStatusEnum; import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.CheckpointExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; import com.google.common.base.Preconditions; public class JobInfoConverter { - public static JobInstance parseToJobInstance(AbstractExecutable job, Map<String, Output> outputs) { + public static JobInstance parseToJobInstance(CubingJob job, Map<String, Output> outputs) { if (job == null) { return null; } - Preconditions.checkState(job instanceof CubingJob, "illegal job type, id:" + job.getId()); - CubingJob cubeJob = (CubingJob) job; Output output = outputs.get(job.getId()); final JobInstance result = new JobInstance(); result.setName(job.getName()); - result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams())); - result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams())); + result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams())); + result.setRelatedSegment(CubingExecutableUtil.getSegmentId(job.getParams())); result.setLastModified(output.getLastModified()); - result.setSubmitter(cubeJob.getSubmitter()); - result.setUuid(cubeJob.getId()); + result.setSubmitter(job.getSubmitter()); + result.setUuid(job.getId()); result.setType(CubeBuildTypeEnum.BUILD); result.setStatus(parseToJobStatus(output.getState())); result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000); @@ -55,8 +54,32 @@ public class JobInfoConverter { result.setExecEndTime(AbstractExecutable.getEndTime(output)); result.setExecInterruptTime(AbstractExecutable.getInterruptTime(output)); result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), result.getExecEndTime(), result.getExecInterruptTime()) / 1000); - for (int i = 0; i < cubeJob.getTasks().size(); ++i) { - AbstractExecutable task = cubeJob.getTasks().get(i); + for (int i = 0; i < job.getTasks().size(); ++i) { + AbstractExecutable task = job.getTasks().get(i); + result.addStep(parseToJobStep(task, i, outputs.get(task.getId()))); + } + return result; + } + + public static JobInstance parseToJobInstance(CheckpointExecutable job, Map<String, Output> outputs) { + if (job == null) { + return null; + } + Output output = outputs.get(job.getId()); + final JobInstance result = new JobInstance(); + result.setName(job.getName()); + result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams())); + result.setLastModified(output.getLastModified()); + result.setSubmitter(job.getSubmitter()); + result.setUuid(job.getId()); + result.setType(CubeBuildTypeEnum.CHECKPOINT); + result.setStatus(parseToJobStatus(output.getState())); + result.setExecStartTime(AbstractExecutable.getStartTime(output)); + result.setExecEndTime(AbstractExecutable.getEndTime(output)); + result.setExecInterruptTime(AbstractExecutable.getInterruptTime(output)); + result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), result.getExecEndTime(), result.getExecInterruptTime()) / 1000); + for (int i = 0; i < job.getTasks().size(); ++i) { + AbstractExecutable task = job.getTasks().get(i); result.addStep(parseToJobStep(task, i, outputs.get(task.getId()))); } return result; http://git-wip-us.apache.org/repos/asf/kylin/blob/c0a785d4/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 5bf684a..432d300 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -50,6 +50,7 @@ import org.apache.kylin.job.constant.JobTimeFilterEnum; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.exception.SchedulerException; import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.CheckpointExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; @@ -315,6 +316,33 @@ public class JobService extends BasicService implements InitializingBean { return result; } + protected JobInstance getCheckpointJobInstance(AbstractExecutable job) { + Message msg = MsgPicker.getMsg(); + + if (job == null) { + return null; + } + if (!(job instanceof CheckpointExecutable)) { + throw new BadRequestException(String.format(msg.getILLEGAL_JOB_TYPE(), job.getId())); + } + + CheckpointExecutable checkpointExecutable = (CheckpointExecutable) job; + final JobInstance result = new JobInstance(); + result.setName(job.getName()); + result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams())); + result.setLastModified(job.getLastModified()); + result.setSubmitter(job.getSubmitter()); + result.setUuid(job.getId()); + result.setType(CubeBuildTypeEnum.CHECKPOINT); + result.setStatus(JobInfoConverter.parseToJobStatus(job.getStatus())); + result.setDuration(job.getDuration() / 1000); + for (int i = 0; i < checkpointExecutable.getTasks().size(); ++i) { + AbstractExecutable task = checkpointExecutable.getTasks().get(i); + result.addStep(JobInfoConverter.parseToJobStep(task, i, getExecutableManager().getOutput(task.getId()))); + } + return result; + } + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')") public void resumeJob(JobInstance job) { @@ -374,6 +402,7 @@ public class JobService extends BasicService implements InitializingBean { Integer limit = (null == limitValue) ? 30 : limitValue; Integer offset = (null == offsetValue) ? 0 : offsetValue; List<JobInstance> jobs = searchJobsByCubeName(cubeNameSubstring, projectName, statusList, timeFilter); + Collections.sort(jobs); if (jobs.size() <= offset) { @@ -389,12 +418,40 @@ public class JobService extends BasicService implements InitializingBean { public List<JobInstance> searchJobsByCubeName(final String cubeNameSubstring, final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) { - return innerSearchCubingJobs(cubeNameSubstring, null, projectName, statusList, timeFilter); + return searchJobsByCubeName(cubeNameSubstring, projectName, statusList, timeFilter, JobSearchMode.ALL); + } + + public List<JobInstance> searchJobsByCubeName(final String cubeNameSubstring, final String projectName, + final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) { + return innerSearchJobs(cubeNameSubstring, null, projectName, statusList, timeFilter, jobSearchMode); } public List<JobInstance> searchJobsByJobName(final String jobName, final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) { - return innerSearchCubingJobs(null, jobName, projectName, statusList, timeFilter); + return searchJobsByJobName(jobName, projectName, statusList, timeFilter, JobSearchMode.ALL); + } + + public List<JobInstance> searchJobsByJobName(final String jobName, final String projectName, + final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) { + return innerSearchJobs(null, jobName, projectName, statusList, timeFilter, jobSearchMode); + } + + public List<JobInstance> innerSearchJobs(final String cubeName, final String jobName, final String projectName, + final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter, JobSearchMode jobSearchMode) { + List<JobInstance> result = Lists.newArrayList(); + switch (jobSearchMode) { + case CUBING_ONLY: + result.addAll(innerSearchCubingJobs(cubeName, jobName, projectName, statusList, timeFilter)); + break; + case CHECKPOINT_ONLY: + result.addAll(innerSearchCheckpointJobs(cubeName, jobName, projectName, statusList, timeFilter)); + break; + case ALL: + default: + result.addAll(innerSearchCubingJobs(cubeName, jobName, projectName, statusList, timeFilter)); + result.addAll(innerSearchCheckpointJobs(cubeName, jobName, projectName, statusList, timeFilter)); + } + return result; } public List<JobInstance> innerSearchCubingJobs(final String cubeName, final String jobName, @@ -494,6 +551,108 @@ public class JobService extends BasicService implements InitializingBean { return results; } + public List<JobInstance> innerSearchCheckpointJobs(final String cubeName, final String jobName, + final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) { + // prepare time range + Calendar calendar = Calendar.getInstance(); + calendar.setTime(new Date()); + long timeStartInMillis = getTimeStartInMillis(calendar, timeFilter); + long timeEndInMillis = Long.MAX_VALUE; + Set<ExecutableState> states = convertStatusEnumToStates(statusList); + final Map<String, Output> allOutputs = getExecutableManager().getAllOutputs(timeStartInMillis, timeEndInMillis); + + return Lists + .newArrayList(FluentIterable + .from(innerSearchCheckpointJobs(cubeName, jobName, states, timeStartInMillis, timeEndInMillis, + allOutputs, false, projectName)) + .transform(new Function<CheckpointExecutable, JobInstance>() { + @Override + public JobInstance apply(CheckpointExecutable checkpointExecutable) { + return JobInfoConverter.parseToJobInstance(checkpointExecutable, allOutputs); + } + })); + } + + public List<CheckpointExecutable> innerSearchCheckpointJobs(final String cubeName, final String jobName, + final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis, + final Map<String, Output> allOutputs, final boolean nameExactMatch, final String projectName) { + List<CheckpointExecutable> results = Lists + .newArrayList( + FluentIterable + .from(getExecutableManager().getAllAbstractExecutables(timeStartInMillis, + timeEndInMillis, CheckpointExecutable.class)) + .filter(new Predicate<AbstractExecutable>() { + @Override + public boolean apply(AbstractExecutable executable) { + if (executable instanceof CheckpointExecutable) { + if (StringUtils.isEmpty(cubeName)) { + return true; + } + String executableCubeName = CubingExecutableUtil + .getCubeName(executable.getParams()); + if (executableCubeName == null) + return true; + if (nameExactMatch) + return executableCubeName.toLowerCase().equals(cubeName); + else + return executableCubeName.toLowerCase() + .contains(cubeName.toLowerCase()); + } else { + return false; + } + } + }).transform(new Function<AbstractExecutable, CheckpointExecutable>() { + @Override + public CheckpointExecutable apply(AbstractExecutable executable) { + return (CheckpointExecutable) executable; + } + }).filter(Predicates.and(new Predicate<CheckpointExecutable>() { + @Override + public boolean apply(CheckpointExecutable executable) { + if (null == projectName + || null == getProjectManager().getProject(projectName)) { + return true; + } else { + return projectName.equals(executable.getProjectName()); + } + } + }, new Predicate<CheckpointExecutable>() { + @Override + public boolean apply(CheckpointExecutable executable) { + try { + Output output = allOutputs.get(executable.getId()); + if (output == null) { + return false; + } + + ExecutableState state = output.getState(); + boolean ret = statusList.contains(state); + return ret; + } catch (Exception e) { + throw e; + } + } + }, new Predicate<CheckpointExecutable>() { + @Override + public boolean apply(@Nullable CheckpointExecutable checkpointExecutable) { + if (checkpointExecutable == null) { + return false; + } + + if (Strings.isEmpty(jobName)) { + return true; + } + + if (nameExactMatch) { + return checkpointExecutable.getName().toLowerCase().equals(jobName); + } else { + return checkpointExecutable.getName().toLowerCase().contains(jobName); + } + } + }))); + return results; + } + public List<CubingJob> listJobsByRealizationName(final String realizationName, final String projectName, final Set<ExecutableState> statusList) { return innerSearchCubingJobs(realizationName, null, statusList, 0L, Long.MAX_VALUE, @@ -504,4 +663,7 @@ public class JobService extends BasicService implements InitializingBean { return listJobsByRealizationName(realizationName, projectName, EnumSet.allOf(ExecutableState.class)); } + public enum JobSearchMode { + CUBING_ONLY, CHECKPOINT_ONLY, ALL + } }