Running task's status should be set to READY after metastore recover
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c0942651 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c0942651 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c0942651 Branch: refs/heads/master Commit: c09426515918b53495811ec0de6b3a21cb49f383 Parents: 05610de Author: nichunen <chunen...@kyligence.io> Authored: Wed Dec 13 16:14:29 2017 +0800 Committer: Hongbin Ma <m...@kyligence.io> Committed: Wed Dec 13 17:23:48 2017 +0800 ---------------------------------------------------------------------- .../kylin/job/execution/ExecutableManager.java | 31 +++++++++++++++++--- .../job/impl/threadpool/DefaultScheduler.java | 2 +- 2 files changed, 28 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/c0942651/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 9f67a2b..6110573 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -61,7 +61,7 @@ public class ExecutableManager { } // ============================================================================ - + private final KylinConfig config; private final ExecutableDao executableDao; @@ -223,7 +223,8 @@ public class ExecutableManager { * @param expectedClass * @return */ - public List<AbstractExecutable> getAllAbstractExecutables(long timeStartInMillis, long timeEndInMillis, Class<? extends AbstractExecutable> expectedClass) { + public List<AbstractExecutable> getAllAbstractExecutables(long timeStartInMillis, long timeEndInMillis, + Class<? extends AbstractExecutable> expectedClass) { try { List<AbstractExecutable> ret = Lists.newArrayList(); for (ExecutablePO po : executableDao.getJobs(timeStartInMillis, timeEndInMillis)) { @@ -395,7 +396,8 @@ public class ExecutableManager { ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus()); if (newStatus != null && oldStatus != newStatus) { if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) { - throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus + ", job id: " + jobId); + throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + + newStatus + ", job id: " + jobId); } jobOutput.setStatus(newStatus.toString()); } @@ -413,6 +415,26 @@ public class ExecutableManager { } } + public void forceKillJob(String jobId) { + try { + final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId); + jobOutput.setStatus(ExecutableState.ERROR.toString()); + List<ExecutablePO> tasks = executableDao.getJob(jobId).getTasks(); + + for (ExecutablePO task : tasks) { + if (executableDao.getJobOutput(task.getId()).getStatus().equals("SUCCEED")) { + continue; + } else if (executableDao.getJobOutput(task.getId()).getStatus().equals("RUNNING")) { + updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), ""); + } + break; + } + executableDao.updateJobOutput(jobOutput); + } catch (PersistentException e) { + throw new RuntimeException(e); + } + } + //for migration only //TODO delete when migration finished public void resetJobOutput(String jobId, ExecutableState state, String output) { @@ -495,7 +517,8 @@ public class ExecutableManager { } } - private AbstractExecutable parseToAbstract(ExecutablePO executablePO, Class<? extends AbstractExecutable> expectedClass) { + private AbstractExecutable parseToAbstract(ExecutablePO executablePO, + Class<? extends AbstractExecutable> expectedClass) { if (executablePO == null) { logger.warn("executablePO is null"); return null; http://git-wip-us.apache.org/repos/asf/kylin/blob/c0942651/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index f5360da..ec5f552 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -104,7 +104,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti nStopped++; } else { if (fetchFailed) { - executableManager.updateJobOutput(id, ExecutableState.ERROR, null, null); + executableManager.forceKillJob(id); nError++; } else { nOthers++;