KYLIN-3106 DefaultScheduler.shutdown should use ExecutorService.shutdownNow instead of ExecutorService.shutdown
fix ui Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f2722eb7 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f2722eb7 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f2722eb7 Branch: refs/heads/master Commit: f2722eb7d147735c79102e9e4bb35bf7b38d8534 Parents: 3b200a7 Author: Hongbin Ma <mahong...@apache.org> Authored: Thu Dec 14 14:58:50 2017 +0800 Committer: Hongbin Ma <m...@kyligence.io> Committed: Wed Dec 27 20:05:54 2017 +0800 ---------------------------------------------------------------------- .../kylin/job/execution/ExecutableManager.java | 5 + .../job/impl/threadpool/DefaultScheduler.java | 15 ++- .../job/FiveSecondSucceedTestExecutable.java | 46 +++++++++ .../job/impl/threadpool/BaseSchedulerTest.java | 39 +++++--- .../impl/threadpool/DefaultSchedulerTest.java | 99 +++++++++++++++----- 5 files changed, 160 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/f2722eb7/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 6110573..561c38d 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -390,6 +390,11 @@ public class ExecutableManager { } public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) { + // when + if (Thread.currentThread().isInterrupted()) { + throw new RuntimeException("Current thread is interruptted, aborting"); + } + try { final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId); Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + jobId); http://git-wip-us.apache.org/repos/asf/kylin/blob/f2722eb7/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 42185cc..69d85ca 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -81,7 +81,8 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti @Override synchronized public void run() { - try { + try (SetThreadName ignored = new SetThreadName("Scheduler %s FetcherRunner", + System.identityHashCode(DefaultScheduler.this))) { // logger.debug("Job Fetcher is running..."); Map<String, Executable> runningJobs = context.getRunningJobs(); if (isJobPoolFull()) { @@ -141,7 +142,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti + nStopped + " stopped, " + nReady + " ready, " + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers + " others"); } catch (Exception e) { - fetchFailed = true; + fetchFailed = true; // this could happen when resource store is unavailable logger.warn("Job Fetcher caught a exception ", e); } } @@ -258,12 +259,18 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti logger.info("Shutting down DefaultScheduler ...."); jobLock.unlockJobEngine(); try { - fetcherPool.shutdown(); + fetcherPool.shutdownNow();//interrupt fetcherPool.awaitTermination(1, TimeUnit.MINUTES); - jobPool.shutdown(); + } catch (InterruptedException e) { + //ignore it + logger.warn("InterruptedException is caught!"); + } + try { + jobPool.shutdownNow();//interrupt jobPool.awaitTermination(1, TimeUnit.MINUTES); } catch (InterruptedException e) { //ignore it + logger.warn("InterruptedException is caught!"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2722eb7/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java new file mode 100644 index 0000000..fd17370 --- /dev/null +++ b/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; + +/** + */ +public class FiveSecondSucceedTestExecutable extends BaseTestExecutable { + + public FiveSecondSucceedTestExecutable() { + super(); + } + + public FiveSecondSucceedTestExecutable(int sleepTime) { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f2722eb7/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java index e0a542e..deaa425 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java @@ -24,6 +24,7 @@ import java.lang.reflect.Modifier; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.exception.SchedulerException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; @@ -38,16 +39,20 @@ import org.slf4j.LoggerFactory; public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { private static final Logger logger = LoggerFactory.getLogger(BaseSchedulerTest.class); - - private DefaultScheduler scheduler; - protected ExecutableManager jobService; + protected DefaultScheduler scheduler; + + protected ExecutableManager execMgr; @Before public void setup() throws Exception { System.setProperty("kylin.job.scheduler.poll-interval-second", "1"); createTestMetadata(); - jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); + execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); + startScheduler(); + } + + protected void startScheduler() throws SchedulerException { scheduler = DefaultScheduler.createInstance(); scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new MockJobLock()); if (!scheduler.hasStarted()) { @@ -72,21 +77,23 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { field.set(null, newValue); } - protected void waitForJobFinish(String jobId) { + protected void waitForJobFinish(String jobId, int maxWaitTime) { int error = 0; + long start = System.currentTimeMillis(); final int errorLimit = 3; - - while (error < errorLimit) { + + while (error < errorLimit && (System.currentTimeMillis() - start < maxWaitTime)) { try { - Thread.sleep(2000); + Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } try { - AbstractExecutable job = jobService.getJob(jobId); + AbstractExecutable job = execMgr.getJob(jobId); ExecutableState status = job.getStatus(); - if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) { + if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR + || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) { break; } } catch (Exception ex) { @@ -94,15 +101,19 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { error++; } } - + if (error >= errorLimit) { - throw new RuntimeException("waitForJobFinish() encounters exceptions, see logs above"); + throw new RuntimeException("too many exceptions"); + } + + if (System.currentTimeMillis() - start >= maxWaitTime) { + throw new RuntimeException("too long wait time"); } } protected void waitForJobStatus(String jobId, ExecutableState state, long interval) { while (true) { - AbstractExecutable job = jobService.getJob(jobId); + AbstractExecutable job = execMgr.getJob(jobId); if (job.getStatus() == state) { break; } else { @@ -118,7 +129,7 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { protected void runningJobToError(String jobId) { while (true) { try { - AbstractExecutable job = jobService.getJob(jobId); + AbstractExecutable job = execMgr.getJob(jobId); ExecutableState status = job.getStatus(); if (status == ExecutableState.RUNNING) { scheduler.fetchFailed = true; http://git-wip-us.apache.org/repos/asf/kylin/blob/f2722eb7/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java index 5c51f59..63292f0 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kylin.job.BaseTestExecutable; import org.apache.kylin.job.ErrorTestExecutable; import org.apache.kylin.job.FailedTestExecutable; +import org.apache.kylin.job.FiveSecondSucceedTestExecutable; import org.apache.kylin.job.NoErrorStatusExecutable; import org.apache.kylin.job.SelfStopExecutable; import org.apache.kylin.job.SucceedTestExecutable; @@ -37,7 +38,9 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.junit.Assert; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,16 +49,19 @@ import org.slf4j.LoggerFactory; public class DefaultSchedulerTest extends BaseSchedulerTest { private static final Logger logger = LoggerFactory.getLogger(DefaultSchedulerTest.class); + @Rule + public ExpectedException thrown = ExpectedException.none(); + @Test public void testSingleTaskJob() throws Exception { logger.info("testSingleTaskJob"); DefaultChainedExecutable job = new DefaultChainedExecutable(); BaseTestExecutable task1 = new SucceedTestExecutable(); job.addTask(task1); - jobService.addJob(job); - waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); + execMgr.addJob(job); + waitForJobFinish(job.getId(), 10000); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState()); } @Test @@ -66,11 +72,11 @@ public class DefaultSchedulerTest extends BaseSchedulerTest { BaseTestExecutable task2 = new SucceedTestExecutable(); job.addTask(task1); job.addTask(task2); - jobService.addJob(job); - waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState()); + execMgr.addJob(job); + waitForJobFinish(job.getId(), 10000); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task2.getId()).getState()); } @Test @@ -81,11 +87,11 @@ public class DefaultSchedulerTest extends BaseSchedulerTest { BaseTestExecutable task2 = new FailedTestExecutable(); job.addTask(task1); job.addTask(task2); - jobService.addJob(job); - waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); - Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState()); + execMgr.addJob(job); + waitForJobFinish(job.getId(), 10000); + Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(task2.getId()).getState()); } @Test @@ -96,11 +102,11 @@ public class DefaultSchedulerTest extends BaseSchedulerTest { BaseTestExecutable task2 = new SucceedTestExecutable(); job.addTask(task1); job.addTask(task2); - jobService.addJob(job); - waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState()); - Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState()); - Assert.assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState()); + execMgr.addJob(job); + waitForJobFinish(job.getId(), 10000); + Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.READY, execMgr.getOutput(task2.getId()).getState()); } @Test @@ -109,13 +115,13 @@ public class DefaultSchedulerTest extends BaseSchedulerTest { DefaultChainedExecutable job = new DefaultChainedExecutable(); SelfStopExecutable task1 = new SelfStopExecutable(); job.addTask(task1); - jobService.addJob(job); + execMgr.addJob(job); Thread.sleep(1100); // give time to launch job/task1 waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500); - jobService.discardJob(job.getId()); - waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState()); - Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState()); + execMgr.discardJob(job.getId()); + waitForJobFinish(job.getId(), 10000); + Assert.assertEquals(ExecutableState.DISCARDED, execMgr.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.DISCARDED, execMgr.getOutput(task1.getId()).getState()); task1.waitForDoWork(); } @@ -153,10 +159,51 @@ public class DefaultSchedulerTest extends BaseSchedulerTest { NoErrorStatusExecutable job = new NoErrorStatusExecutable(); ErrorTestExecutable task = new ErrorTestExecutable(); job.addTask(task); - jobService.addJob(job); + execMgr.addJob(job); Thread.sleep(2000); runningJobToError(job.getId()); Thread.sleep(2000); - Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState()); + } + + @Test + public void testSchedulerStop() throws Exception { + logger.info("testSchedulerStop"); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("too long wait time"); + + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new FiveSecondSucceedTestExecutable(); + job.addTask(task1); + execMgr.addJob(job); + + //sleep 3s to make sure SucceedTestExecutable is running + Thread.sleep(3000); + //scheduler failed due to some reason + scheduler.shutdown(); + + waitForJobFinish(job.getId(), 6000); + } + + @Test + public void testSchedulerRestart() throws Exception { + logger.info("testSchedulerRestart"); + + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new FiveSecondSucceedTestExecutable(); + job.addTask(task1); + execMgr.addJob(job); + + //sleep 3s to make sure SucceedTestExecutable is running + Thread.sleep(3000); + //scheduler failed due to some reason + scheduler.shutdown(); + //restart + startScheduler(); + + waitForJobFinish(job.getId(), 10000); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState()); } }