Revert "minor, change the way that get DefaultScheduler's instance."
This reverts commit 6cf61248f93f57bc6aa8b01f8898de3eb0bf3d3c. fix ut Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6c31ca92 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6c31ca92 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6c31ca92 Branch: refs/heads/master Commit: 6c31ca9235a84df588806bc51b108d921db18e1e Parents: 63de91d Author: Hongbin Ma <mahong...@apache.org> Authored: Sat Dec 23 20:23:19 2017 +0800 Committer: Hongbin Ma <m...@kyligence.io> Committed: Wed Dec 27 20:05:54 2017 +0800 ---------------------------------------------------------------------- .../job/impl/threadpool/DefaultScheduler.java | 34 +++++++++++++++----- .../job/impl/threadpool/BaseSchedulerTest.java | 15 +++++---- .../kylin/provision/BuildCubeWithEngine.java | 3 +- .../kylin/provision/BuildCubeWithStream.java | 3 +- 4 files changed, 38 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/6c31ca92/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index c8ab7ea..42185cc 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -62,14 +62,21 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti volatile boolean fetchFailed = false; private JobEngineConfig jobEngineConfig; - private static volatile DefaultScheduler INSTANCE = null; + private static DefaultScheduler INSTANCE = null; public DefaultScheduler() { if (INSTANCE != null) { - throw new IllegalStateException("DefaultScheduler has been initiated. Use getInstance() instead."); + throw new IllegalStateException("DefaultScheduler has been initiated."); } } + public static DefaultScheduler getInstance() { + if (INSTANCE == null) { + INSTANCE = createInstance(); + } + return INSTANCE; + } + private class FetcherRunner implements Runnable { @Override @@ -170,7 +177,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti } finally { context.removeRunningJob(executable); } - + // trigger the next step asap fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); } @@ -180,7 +187,6 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti public void stateChanged(CuratorFramework client, ConnectionState newState) { if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) { try { - logger.info("ZK Connection state change to " + newState + ", shutdown default scheduler."); shutdown(); } catch (SchedulerException e) { throw new RuntimeException("failed to shutdown scheduler", e); @@ -188,13 +194,25 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti } } - public synchronized static DefaultScheduler getInstance() { - if (INSTANCE == null) { - INSTANCE = new DefaultScheduler(); - } + public synchronized static DefaultScheduler createInstance() { + destroyInstance(); + INSTANCE = new DefaultScheduler(); return INSTANCE; } + public synchronized static void destroyInstance() { + DefaultScheduler tmp = INSTANCE; + INSTANCE = null; + if (tmp != null) { + try { + tmp.shutdown(); + } catch (SchedulerException e) { + logger.error("error stop DefaultScheduler", e); + throw new RuntimeException(e); + } + } + } + @Override public synchronized void init(JobEngineConfig jobEngineConfig, JobLock lock) throws SchedulerException { jobLock = lock; http://git-wip-us.apache.org/repos/asf/kylin/blob/6c31ca92/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java index 6786d31..e0a542e 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java @@ -29,7 +29,7 @@ import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.lock.MockJobLock; import org.junit.After; -import org.junit.BeforeClass; +import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,16 +39,16 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { private static final Logger logger = LoggerFactory.getLogger(BaseSchedulerTest.class); - private static DefaultScheduler scheduler; + private DefaultScheduler scheduler; - protected static ExecutableManager jobService; + protected ExecutableManager jobService; - @BeforeClass - public static void setup() throws Exception { + @Before + public void setup() throws Exception { System.setProperty("kylin.job.scheduler.poll-interval-second", "1"); - staticCreateTestMetadata(); + createTestMetadata(); jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); - scheduler = DefaultScheduler.getInstance(); + scheduler = DefaultScheduler.createInstance(); scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new MockJobLock()); if (!scheduler.hasStarted()) { throw new RuntimeException("scheduler has not been started"); @@ -57,6 +57,7 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { @After public void after() throws Exception { + DefaultScheduler.destroyInstance(); cleanupTestMetadata(); System.clearProperty("kylin.job.scheduler.poll-interval-second"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/6c31ca92/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index a432902..18a07cf 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -156,7 +156,7 @@ public class BuildCubeWithEngine { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); jobService = ExecutableManager.getInstance(kylinConfig); - scheduler = DefaultScheduler.getInstance(); + scheduler = DefaultScheduler.createInstance(); scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); if (!scheduler.hasStarted()) { throw new RuntimeException("scheduler has not been started"); @@ -172,6 +172,7 @@ public class BuildCubeWithEngine { } public void after() { + DefaultScheduler.destroyInstance(); } public static void afterClass() { http://git-wip-us.apache.org/repos/asf/kylin/blob/6c31ca92/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index bf614af..f9277bc 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -111,7 +111,7 @@ public class BuildCubeWithStream { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); jobService = ExecutableManager.getInstance(kylinConfig); - scheduler = DefaultScheduler.getInstance(); + scheduler = DefaultScheduler.createInstance(); scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); if (!scheduler.hasStarted()) { throw new RuntimeException("scheduler has not been started"); @@ -306,6 +306,7 @@ public class BuildCubeWithStream { public void after() { kafkaServer.stop(); cleanKafkaZkPath(kafkaZkPath); + DefaultScheduler.destroyInstance(); } private void cleanKafkaZkPath(String path) {