Fix job status not changed to error after HBase Region Server is shutdown

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1e359901
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1e359901
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1e359901

Branch: refs/heads/master
Commit: 1e35990184815c3ed0f655542d8c9af118560dae
Parents: 4629d84
Author: nichunen <chunen...@kyligence.io>
Authored: Tue Dec 12 22:21:10 2017 +0800
Committer: Ni Chunen <chunen...@kyligence.io>
Committed: Tue Dec 12 22:43:24 2017 +0800

----------------------------------------------------------------------
 .../job/impl/threadpool/DefaultScheduler.java   | 11 +++++-
 .../kylin/job/NoErrorStatusExecutable.java      | 37 ++++++++++++++++++++
 .../job/impl/threadpool/BaseSchedulerTest.java  | 16 +++++++++
 .../impl/threadpool/DefaultSchedulerTest.java   | 14 ++++++++
 4 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/1e359901/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index b87a839..f5360da 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -59,6 +59,7 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
     private static final Logger logger = 
LoggerFactory.getLogger(DefaultScheduler.class);
     private volatile boolean initialized = false;
     private volatile boolean hasStarted = false;
+    volatile boolean fetchFailed = false;
     private JobEngineConfig jobEngineConfig;
 
     private static DefaultScheduler INSTANCE = null;
@@ -102,7 +103,12 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
                         } else if (output.getState() == 
ExecutableState.STOPPED) {
                             nStopped++;
                         } else {
-                            nOthers++;
+                            if (fetchFailed) {
+                                executableManager.updateJobOutput(id, 
ExecutableState.ERROR, null, null);
+                                nError++;
+                            } else {
+                                nOthers++;
+                            }
                         }
                         continue;
                     }
@@ -122,10 +128,13 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
                         logger.warn(jobDesc + " fail to schedule", ex);
                     }
                 }
+
+                fetchFailed = false;
                 logger.info("Job Fetcher: " + nRunning + " should running, " + 
runningJobs.size() + " actual running, "
                         + nStopped + " stopped, " + nReady + " ready, " + 
nSUCCEED + " already succeed, " + nError
                         + " error, " + nDiscarded + " discarded, " + nOthers + 
" others");
             } catch (Exception e) {
+                fetchFailed = true;
                 logger.warn("Job Fetcher caught a exception ", e);
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e359901/core-job/src/test/java/org/apache/kylin/job/NoErrorStatusExecutable.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/NoErrorStatusExecutable.java 
b/core-job/src/test/java/org/apache/kylin/job/NoErrorStatusExecutable.java
new file mode 100644
index 0000000..a203eb7
--- /dev/null
+++ b/core-job/src/test/java/org/apache/kylin/job/NoErrorStatusExecutable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+
+/**
+ */
+public class NoErrorStatusExecutable extends DefaultChainedExecutable {
+
+    public NoErrorStatusExecutable() {
+        super();
+    }
+
+    @Override
+    protected void onExecuteError(Throwable exception, ExecutableContext 
executableContext) {
+        return;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e359901/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 39c5f29..e0a542e 100644
--- 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -114,4 +114,20 @@ public abstract class BaseSchedulerTest extends 
LocalFileMetadataTestCase {
             }
         }
     }
+
+    protected void runningJobToError(String jobId) {
+        while (true) {
+            try {
+                AbstractExecutable job = jobService.getJob(jobId);
+                ExecutableState status = job.getStatus();
+                if (status == ExecutableState.RUNNING) {
+                    scheduler.fetchFailed = true;
+                    break;
+                }
+                Thread.sleep(1000);
+            } catch (Exception ex) {
+                logger.error("", ex);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e359901/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
index c8b251d..5c51f59 100644
--- 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.kylin.job.BaseTestExecutable;
 import org.apache.kylin.job.ErrorTestExecutable;
 import org.apache.kylin.job.FailedTestExecutable;
+import org.apache.kylin.job.NoErrorStatusExecutable;
 import org.apache.kylin.job.SelfStopExecutable;
 import org.apache.kylin.job.SucceedTestExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -145,4 +146,17 @@ public class DefaultSchedulerTest extends 
BaseSchedulerTest {
         assertFalse("countDownLatch2 should NOT reach zero in 15 secs", 
countDownLatch2.await(7, TimeUnit.SECONDS));
         assertFalse("future2 should has been stopped", future2.cancel(true));
     }
+
+    @Test
+    public void tesMetaStoreRecover() throws Exception {
+        logger.info("tesMetaStoreRecover");
+        NoErrorStatusExecutable job = new NoErrorStatusExecutable();
+        ErrorTestExecutable task = new ErrorTestExecutable();
+        job.addTask(task);
+        jobService.addJob(job);
+        Thread.sleep(2000);
+        runningJobToError(job.getId());
+        Thread.sleep(2000);
+        Assert.assertEquals(ExecutableState.ERROR, 
jobService.getOutput(job.getId()).getState());
+    }
 }

Reply via email to