Running task's status should be set to READY after metastore recover

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c0942651
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c0942651
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c0942651

Branch: refs/heads/master
Commit: c09426515918b53495811ec0de6b3a21cb49f383
Parents: 05610de
Author: nichunen <chunen...@kyligence.io>
Authored: Wed Dec 13 16:14:29 2017 +0800
Committer: Hongbin Ma <m...@kyligence.io>
Committed: Wed Dec 13 17:23:48 2017 +0800

----------------------------------------------------------------------
 .../kylin/job/execution/ExecutableManager.java  | 31 +++++++++++++++++---
 .../job/impl/threadpool/DefaultScheduler.java   |  2 +-
 2 files changed, 28 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c0942651/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 9f67a2b..6110573 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -61,7 +61,7 @@ public class ExecutableManager {
     }
 
     // 
============================================================================
-    
+
     private final KylinConfig config;
     private final ExecutableDao executableDao;
 
@@ -223,7 +223,8 @@ public class ExecutableManager {
      * @param expectedClass
      * @return
      */
-    public List<AbstractExecutable> getAllAbstractExecutables(long 
timeStartInMillis, long timeEndInMillis, Class<? extends AbstractExecutable> 
expectedClass) {
+    public List<AbstractExecutable> getAllAbstractExecutables(long 
timeStartInMillis, long timeEndInMillis,
+            Class<? extends AbstractExecutable> expectedClass) {
         try {
             List<AbstractExecutable> ret = Lists.newArrayList();
             for (ExecutablePO po : executableDao.getJobs(timeStartInMillis, 
timeEndInMillis)) {
@@ -395,7 +396,8 @@ public class ExecutableManager {
             ExecutableState oldStatus = 
ExecutableState.valueOf(jobOutput.getStatus());
             if (newStatus != null && oldStatus != newStatus) {
                 if (!ExecutableState.isValidStateTransfer(oldStatus, 
newStatus)) {
-                    throw new IllegalStateTranferException("there is no valid 
state transfer from:" + oldStatus + " to:" + newStatus + ", job id: " + jobId);
+                    throw new IllegalStateTranferException("there is no valid 
state transfer from:" + oldStatus + " to:"
+                            + newStatus + ", job id: " + jobId);
                 }
                 jobOutput.setStatus(newStatus.toString());
             }
@@ -413,6 +415,26 @@ public class ExecutableManager {
         }
     }
 
+    public void forceKillJob(String jobId) {
+        try {
+            final ExecutableOutputPO jobOutput = 
executableDao.getJobOutput(jobId);
+            jobOutput.setStatus(ExecutableState.ERROR.toString());
+            List<ExecutablePO> tasks = executableDao.getJob(jobId).getTasks();
+
+            for (ExecutablePO task : tasks) {
+                if 
(executableDao.getJobOutput(task.getId()).getStatus().equals("SUCCEED")) {
+                    continue;
+                } else if 
(executableDao.getJobOutput(task.getId()).getStatus().equals("RUNNING")) {
+                    updateJobOutput(task.getId(), ExecutableState.READY, 
Maps.<String, String> newHashMap(), "");
+                }
+                break;
+            }
+            executableDao.updateJobOutput(jobOutput);
+        } catch (PersistentException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     //for migration only
     //TODO delete when migration finished
     public void resetJobOutput(String jobId, ExecutableState state, String 
output) {
@@ -495,7 +517,8 @@ public class ExecutableManager {
         }
     }
 
-    private AbstractExecutable parseToAbstract(ExecutablePO executablePO, 
Class<? extends AbstractExecutable> expectedClass) {
+    private AbstractExecutable parseToAbstract(ExecutablePO executablePO,
+            Class<? extends AbstractExecutable> expectedClass) {
         if (executablePO == null) {
             logger.warn("executablePO is null");
             return null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c0942651/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index f5360da..ec5f552 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -104,7 +104,7 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
                             nStopped++;
                         } else {
                             if (fetchFailed) {
-                                executableManager.updateJobOutput(id, 
ExecutableState.ERROR, null, null);
+                                executableManager.forceKillJob(id);
                                 nError++;
                             } else {
                                 nOthers++;

Reply via email to