Repository: kylin Updated Branches: refs/heads/master 827205f17 -> e1d5b2938
KYLIN-277 add API for pause and rollback job Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e1d5b293 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e1d5b293 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e1d5b293 Branch: refs/heads/master Commit: e1d5b29385a5c044fe7f60d27d59baf82499b3de Parents: 827205f Author: shaofengshi <shaofeng...@apache.org> Authored: Thu Nov 24 18:14:27 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Thu Nov 24 18:14:27 2016 +0800 ---------------------------------------------------------------------- .../kylin/job/execution/AbstractExecutable.java | 2 +- .../kylin/job/execution/ExecutableManager.java | 18 +++++++++++++++++ .../kylin/job/execution/ExecutableState.java | 2 ++ .../engine/mr/steps/SaveStatisticsStep.java | 1 - .../kylin/rest/controller/JobController.java | 21 +++++++++++++++++++- .../apache/kylin/rest/service/JobService.java | 7 +++++++ 6 files changed, 48 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 551241b..cd9b033 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -82,7 +82,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) { setEndTime(System.currentTimeMillis()); - if (!isDiscarded()) { + if (!isDiscarded() && !isRunnable()) { if (result.succeed()) { getManager().updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output()); } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 52d4d1c..4351e31 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -292,6 +292,24 @@ public class ExecutableManager { updateJobOutput(jobId, ExecutableState.DISCARDED, null, null); } + + public void rollbackJob(String jobId, String stepId) { + AbstractExecutable job = getJob(jobId); + if (job == null) { + return; + } + + if (job instanceof DefaultChainedExecutable) { + List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); + for (AbstractExecutable task : tasks) { + if (task.getId().compareTo(stepId) >= 0) { + logger.debug("rollback task : " + task); + updateJobOutput(task.getId(), ExecutableState.READY, null, null); + } + } + } + } + public void pauseJob(String jobId) { AbstractExecutable job = getJob(jobId); if (job == null) { http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java index 0684eff..910bd7e 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java @@ -69,6 +69,8 @@ public enum ExecutableState { VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.STOPPED); + //rollback + VALID_STATE_TRANSFER.put(ExecutableState.SUCCEED, ExecutableState.READY); } http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index 7718bfb..79346a5 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -73,7 +73,6 @@ public class SaveStatisticsStep extends AbstractExecutable { rs.putResource(statisticsFileName, is, System.currentTimeMillis()); } finally { IOUtils.closeStream(is); - fs.delete(statisticsFilePath, true); } decideCubingAlgorithm(newSegment, kylinConf); http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java index 1af4394..12f9e2e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java @@ -134,7 +134,7 @@ public class JobController extends BasicController { } /** - * Cancel a job + * Cancel/discard a job * * @return * @throws IOException @@ -175,6 +175,25 @@ public class JobController extends BasicController { } + /** + * Rollback a job to the given step + * + * @return + * @throws IOException + */ + @RequestMapping(value = "/{jobId}/steps/{stepId}/rollback", method = { RequestMethod.PUT }) + @ResponseBody + public JobInstance rollback(@PathVariable String jobId, @PathVariable String stepId) { + try { + final JobInstance jobInstance = jobService.getJobInstance(jobId); + jobService.rollbackJob(jobInstance, stepId); + return jobService.getJobInstance(jobId); + } catch (Exception e) { + logger.error(e.getLocalizedMessage(), e); + throw new InternalErrorException(e); + } + } + public void setJobService(JobService jobService) { this.jobService = jobService; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 19902f0..d6caddb 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -460,6 +460,13 @@ public class JobService extends BasicService implements InitializingBean { } @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')") + public void rollbackJob(JobInstance job, String stepId) throws IOException, JobException { + lockSegment(job.getRelatedSegment()); + + getExecutableManager().rollbackJob(job.getId(), stepId); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')") public JobInstance cancelJob(JobInstance job) throws IOException, JobException { if (null == job.getRelatedCube() || null == getCubeManager().getCube(job.getRelatedCube())) { getExecutableManager().discardJob(job.getId());