Repository: kylin Updated Branches: refs/heads/master 1015daedf -> 5bf6582f6
KYLIN-1811 Error step may be skipped sometimes when resume a cube job Signed-off-by: Yang Li <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5bf6582f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5bf6582f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5bf6582f Branch: refs/heads/master Commit: 5bf6582f66bedb5d25baaa0d82ae193892b97e0c Parents: 1015dae Author: Ma,Gang <[email protected]> Authored: Wed Jun 22 11:07:19 2016 +0800 Committer: Yang Li <[email protected]> Committed: Sun Jun 26 20:21:35 2016 +0800 ---------------------------------------------------------------------- .../kylin/job/execution/DefaultChainedExecutable.java | 14 ++++++++++++++ .../apache/kylin/job/manager/ExecutableManager.java | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5bf6582f/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 7403715..b130f5b 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -46,6 +46,13 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai final int size = executables.size(); for (int i = 0; i < size; ++i) { Executable subTask = executables.get(i); + ExecutableState state = subTask.getStatus(); + if (state == ExecutableState.RUNNING){ + // there is already running subtask, no need to start a new subtask + break; + } else if (state == ExecutableState.ERROR){ + throw new IllegalStateException("invalid subtask state, subtask:" + subTask.getName() + ", state:" + subTask.getStatus()); + } if (subTask.isRunnable()) { return subTask.execute(context); } @@ -53,6 +60,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai return new ExecuteResult(ExecuteResult.State.SUCCEED, null); } + @Override protected void onExecuteStart(ExecutableContext executableContext) { Map<String, String> info = Maps.newHashMap(); @@ -74,6 +82,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai List<? extends Executable> jobs = getTasks(); boolean allSucceed = true; boolean hasError = false; + boolean hasRunning = false; for (Executable task : jobs) { final ExecutableState status = task.getStatus(); if (status == ExecutableState.ERROR) { @@ -82,6 +91,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai if (status != ExecutableState.SUCCEED) { allSucceed = false; } + if (status == ExecutableState.RUNNING) { + hasRunning = true; + } } if (allSucceed) { setEndTime(System.currentTimeMillis()); @@ -91,6 +103,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai setEndTime(System.currentTimeMillis()); jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null); notifyUserStatusChange(executableContext, ExecutableState.ERROR); + } else if (hasRunning){ + jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); } else { jobService.updateJobOutput(getId(), ExecutableState.READY, null, null); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5bf6582f/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java index 7b4e0f0..3a19486 100644 --- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java @@ -246,7 +246,6 @@ public class ExecutableManager { if (job == null) { return; } - updateJobOutput(jobId, ExecutableState.READY, null, null); if (job instanceof DefaultChainedExecutable) { List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); for (AbstractExecutable task : tasks) { @@ -256,6 +255,7 @@ public class ExecutableManager { } } } + updateJobOutput(jobId, ExecutableState.READY, null, null); } public void discardJob(String jobId) {
