KYLIN-3106 DefaultScheduler.shutdown should use ExecutorService.shutdownNow 
instead of ExecutorService.shutdown

fix ui


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f2722eb7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f2722eb7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f2722eb7

Branch: refs/heads/master
Commit: f2722eb7d147735c79102e9e4bb35bf7b38d8534
Parents: 3b200a7
Author: Hongbin Ma <mahong...@apache.org>
Authored: Thu Dec 14 14:58:50 2017 +0800
Committer: Hongbin Ma <m...@kyligence.io>
Committed: Wed Dec 27 20:05:54 2017 +0800

----------------------------------------------------------------------
 .../kylin/job/execution/ExecutableManager.java  |  5 +
 .../job/impl/threadpool/DefaultScheduler.java   | 15 ++-
 .../job/FiveSecondSucceedTestExecutable.java    | 46 +++++++++
 .../job/impl/threadpool/BaseSchedulerTest.java  | 39 +++++---
 .../impl/threadpool/DefaultSchedulerTest.java   | 99 +++++++++++++++-----
 5 files changed, 160 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f2722eb7/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 6110573..561c38d 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -390,6 +390,11 @@ public class ExecutableManager {
     }
 
     public void updateJobOutput(String jobId, ExecutableState newStatus, 
Map<String, String> info, String output) {
+        // when 
+        if (Thread.currentThread().isInterrupted()) {
+            throw new RuntimeException("Current thread is interruptted, 
aborting");
+        }
+
         try {
             final ExecutableOutputPO jobOutput = 
executableDao.getJobOutput(jobId);
             Preconditions.checkArgument(jobOutput != null, "there is no 
related output for job id:" + jobId);

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2722eb7/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 42185cc..69d85ca 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -81,7 +81,8 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
 
         @Override
         synchronized public void run() {
-            try {
+            try (SetThreadName ignored = new SetThreadName("Scheduler %s 
FetcherRunner",
+                    System.identityHashCode(DefaultScheduler.this))) {
                 // logger.debug("Job Fetcher is running...");
                 Map<String, Executable> runningJobs = context.getRunningJobs();
                 if (isJobPoolFull()) {
@@ -141,7 +142,7 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
                         + nStopped + " stopped, " + nReady + " ready, " + 
nSUCCEED + " already succeed, " + nError
                         + " error, " + nDiscarded + " discarded, " + nOthers + 
" others");
             } catch (Exception e) {
-                fetchFailed = true;
+                fetchFailed = true; // this could happen when resource store 
is unavailable
                 logger.warn("Job Fetcher caught a exception ", e);
             }
         }
@@ -258,12 +259,18 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
         logger.info("Shutting down DefaultScheduler ....");
         jobLock.unlockJobEngine();
         try {
-            fetcherPool.shutdown();
+            fetcherPool.shutdownNow();//interrupt
             fetcherPool.awaitTermination(1, TimeUnit.MINUTES);
-            jobPool.shutdown();
+        } catch (InterruptedException e) {
+            //ignore it
+            logger.warn("InterruptedException is caught!");
+        }
+        try {
+            jobPool.shutdownNow();//interrupt
             jobPool.awaitTermination(1, TimeUnit.MINUTES);
         } catch (InterruptedException e) {
             //ignore it
+            logger.warn("InterruptedException is caught!");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2722eb7/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
 
b/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
new file mode 100644
index 0000000..fd17370
--- /dev/null
+++ 
b/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+/**
+ */
+public class FiveSecondSucceedTestExecutable extends BaseTestExecutable {
+
+    public FiveSecondSucceedTestExecutable() {
+        super();
+    }
+
+    public FiveSecondSucceedTestExecutable(int sleepTime) {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2722eb7/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index e0a542e..deaa425 100644
--- 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Modifier;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -38,16 +39,20 @@ import org.slf4j.LoggerFactory;
 public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase {
 
     private static final Logger logger = 
LoggerFactory.getLogger(BaseSchedulerTest.class);
-    
-    private DefaultScheduler scheduler;
 
-    protected ExecutableManager jobService;
+    protected DefaultScheduler scheduler;
+
+    protected ExecutableManager execMgr;
 
     @Before
     public void setup() throws Exception {
         System.setProperty("kylin.job.scheduler.poll-interval-second", "1");
         createTestMetadata();
-        jobService = 
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
+        execMgr = 
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
+        startScheduler();
+    }
+
+    protected void startScheduler() throws SchedulerException {
         scheduler = DefaultScheduler.createInstance();
         scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), 
new MockJobLock());
         if (!scheduler.hasStarted()) {
@@ -72,21 +77,23 @@ public abstract class BaseSchedulerTest extends 
LocalFileMetadataTestCase {
         field.set(null, newValue);
     }
 
-    protected void waitForJobFinish(String jobId) {
+    protected void waitForJobFinish(String jobId, int maxWaitTime) {
         int error = 0;
+        long start = System.currentTimeMillis();
         final int errorLimit = 3;
-        
-        while (error < errorLimit) {
+
+        while (error < errorLimit && (System.currentTimeMillis() - start < 
maxWaitTime)) {
             try {
-                Thread.sleep(2000);
+                Thread.sleep(1000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
 
             try {
-                AbstractExecutable job = jobService.getJob(jobId);
+                AbstractExecutable job = execMgr.getJob(jobId);
                 ExecutableState status = job.getStatus();
-                if (status == ExecutableState.SUCCEED || status == 
ExecutableState.ERROR || status == ExecutableState.STOPPED || status == 
ExecutableState.DISCARDED) {
+                if (status == ExecutableState.SUCCEED || status == 
ExecutableState.ERROR
+                        || status == ExecutableState.STOPPED || status == 
ExecutableState.DISCARDED) {
                     break;
                 }
             } catch (Exception ex) {
@@ -94,15 +101,19 @@ public abstract class BaseSchedulerTest extends 
LocalFileMetadataTestCase {
                 error++;
             }
         }
-        
+
         if (error >= errorLimit) {
-            throw new RuntimeException("waitForJobFinish() encounters 
exceptions, see logs above");
+            throw new RuntimeException("too many exceptions");
+        }
+
+        if (System.currentTimeMillis() - start >= maxWaitTime) {
+            throw new RuntimeException("too long wait time");
         }
     }
 
     protected void waitForJobStatus(String jobId, ExecutableState state, long 
interval) {
         while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
+            AbstractExecutable job = execMgr.getJob(jobId);
             if (job.getStatus() == state) {
                 break;
             } else {
@@ -118,7 +129,7 @@ public abstract class BaseSchedulerTest extends 
LocalFileMetadataTestCase {
     protected void runningJobToError(String jobId) {
         while (true) {
             try {
-                AbstractExecutable job = jobService.getJob(jobId);
+                AbstractExecutable job = execMgr.getJob(jobId);
                 ExecutableState status = job.getStatus();
                 if (status == ExecutableState.RUNNING) {
                     scheduler.fetchFailed = true;

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2722eb7/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
index 5c51f59..63292f0 100644
--- 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.kylin.job.BaseTestExecutable;
 import org.apache.kylin.job.ErrorTestExecutable;
 import org.apache.kylin.job.FailedTestExecutable;
+import org.apache.kylin.job.FiveSecondSucceedTestExecutable;
 import org.apache.kylin.job.NoErrorStatusExecutable;
 import org.apache.kylin.job.SelfStopExecutable;
 import org.apache.kylin.job.SucceedTestExecutable;
@@ -37,7 +38,9 @@ import 
org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.junit.Assert;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,16 +49,19 @@ import org.slf4j.LoggerFactory;
 public class DefaultSchedulerTest extends BaseSchedulerTest {
     private static final Logger logger = 
LoggerFactory.getLogger(DefaultSchedulerTest.class);
 
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
     @Test
     public void testSingleTaskJob() throws Exception {
         logger.info("testSingleTaskJob");
         DefaultChainedExecutable job = new DefaultChainedExecutable();
         BaseTestExecutable task1 = new SucceedTestExecutable();
         job.addTask(task1);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        Assert.assertEquals(ExecutableState.SUCCEED, 
jobService.getOutput(job.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, 
jobService.getOutput(task1.getId()).getState());
+        execMgr.addJob(job);
+        waitForJobFinish(job.getId(), 10000);
+        Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(task1.getId()).getState());
     }
 
     @Test
@@ -66,11 +72,11 @@ public class DefaultSchedulerTest extends BaseSchedulerTest 
{
         BaseTestExecutable task2 = new SucceedTestExecutable();
         job.addTask(task1);
         job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        Assert.assertEquals(ExecutableState.SUCCEED, 
jobService.getOutput(job.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, 
jobService.getOutput(task1.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, 
jobService.getOutput(task2.getId()).getState());
+        execMgr.addJob(job);
+        waitForJobFinish(job.getId(), 10000);
+        Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(task2.getId()).getState());
     }
 
     @Test
@@ -81,11 +87,11 @@ public class DefaultSchedulerTest extends BaseSchedulerTest 
{
         BaseTestExecutable task2 = new FailedTestExecutable();
         job.addTask(task1);
         job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        Assert.assertEquals(ExecutableState.ERROR, 
jobService.getOutput(job.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, 
jobService.getOutput(task1.getId()).getState());
-        Assert.assertEquals(ExecutableState.ERROR, 
jobService.getOutput(task2.getId()).getState());
+        execMgr.addJob(job);
+        waitForJobFinish(job.getId(), 10000);
+        Assert.assertEquals(ExecutableState.ERROR, 
execMgr.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, 
execMgr.getOutput(task2.getId()).getState());
     }
 
     @Test
@@ -96,11 +102,11 @@ public class DefaultSchedulerTest extends 
BaseSchedulerTest {
         BaseTestExecutable task2 = new SucceedTestExecutable();
         job.addTask(task1);
         job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        Assert.assertEquals(ExecutableState.ERROR, 
jobService.getOutput(job.getId()).getState());
-        Assert.assertEquals(ExecutableState.ERROR, 
jobService.getOutput(task1.getId()).getState());
-        Assert.assertEquals(ExecutableState.READY, 
jobService.getOutput(task2.getId()).getState());
+        execMgr.addJob(job);
+        waitForJobFinish(job.getId(), 10000);
+        Assert.assertEquals(ExecutableState.ERROR, 
execMgr.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, 
execMgr.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.READY, 
execMgr.getOutput(task2.getId()).getState());
     }
 
     @Test
@@ -109,13 +115,13 @@ public class DefaultSchedulerTest extends 
BaseSchedulerTest {
         DefaultChainedExecutable job = new DefaultChainedExecutable();
         SelfStopExecutable task1 = new SelfStopExecutable();
         job.addTask(task1);
-        jobService.addJob(job);
+        execMgr.addJob(job);
         Thread.sleep(1100); // give time to launch job/task1 
         waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
-        jobService.discardJob(job.getId());
-        waitForJobFinish(job.getId());
-        Assert.assertEquals(ExecutableState.DISCARDED, 
jobService.getOutput(job.getId()).getState());
-        Assert.assertEquals(ExecutableState.DISCARDED, 
jobService.getOutput(task1.getId()).getState());
+        execMgr.discardJob(job.getId());
+        waitForJobFinish(job.getId(), 10000);
+        Assert.assertEquals(ExecutableState.DISCARDED, 
execMgr.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.DISCARDED, 
execMgr.getOutput(task1.getId()).getState());
         task1.waitForDoWork();
     }
 
@@ -153,10 +159,51 @@ public class DefaultSchedulerTest extends 
BaseSchedulerTest {
         NoErrorStatusExecutable job = new NoErrorStatusExecutable();
         ErrorTestExecutable task = new ErrorTestExecutable();
         job.addTask(task);
-        jobService.addJob(job);
+        execMgr.addJob(job);
         Thread.sleep(2000);
         runningJobToError(job.getId());
         Thread.sleep(2000);
-        Assert.assertEquals(ExecutableState.ERROR, 
jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, 
execMgr.getOutput(job.getId()).getState());
+    }
+
+    @Test
+    public void testSchedulerStop() throws Exception {
+        logger.info("testSchedulerStop");
+
+        thrown.expect(RuntimeException.class);
+        thrown.expectMessage("too long wait time");
+
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new FiveSecondSucceedTestExecutable();
+        job.addTask(task1);
+        execMgr.addJob(job);
+
+        //sleep 3s to make sure SucceedTestExecutable is running 
+        Thread.sleep(3000);
+        //scheduler failed due to some reason
+        scheduler.shutdown();
+
+        waitForJobFinish(job.getId(), 6000);
+    }
+
+    @Test
+    public void testSchedulerRestart() throws Exception {
+        logger.info("testSchedulerRestart");
+
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new FiveSecondSucceedTestExecutable();
+        job.addTask(task1);
+        execMgr.addJob(job);
+
+        //sleep 3s to make sure SucceedTestExecutable is running 
+        Thread.sleep(3000);
+        //scheduler failed due to some reason
+        scheduler.shutdown();
+        //restart
+        startScheduler();
+
+        waitForJobFinish(job.getId(), 10000);
+        Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(task1.getId()).getState());
     }
 }

Reply via email to