minor, change the way that get DefaultScheduler's instance. minor, refine log when lost ZK connection.
minor, remove DefaultScheduler.destroyInstance(). Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3a3d11a3 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3a3d11a3 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3a3d11a3 Branch: refs/heads/master Commit: 3a3d11a319024fb7eccf26eb63f7c41891d66080 Parents: 9ff03c8 Author: tttMelody <245915...@qq.com> Authored: Wed Dec 13 14:44:52 2017 +0800 Committer: Jiatao Tao <245915...@qq.com> Committed: Thu Dec 14 18:00:18 2017 +0800 ---------------------------------------------------------------------- .../job/impl/threadpool/DefaultScheduler.java | 29 +++++--------------- .../job/impl/threadpool/BaseSchedulerTest.java | 15 +++++----- .../kylin/provision/BuildCubeWithEngine.java | 3 +- .../kylin/provision/BuildCubeWithStream.java | 3 +- 4 files changed, 16 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/3a3d11a3/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index ec5f552..327a793 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -62,11 +62,11 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti volatile boolean fetchFailed = false; private JobEngineConfig jobEngineConfig; - private static DefaultScheduler INSTANCE = null; + private static volatile DefaultScheduler INSTANCE = null; public DefaultScheduler() { if (INSTANCE != null) { - throw new IllegalStateException("DefaultScheduler has been initiated."); + throw new IllegalStateException("DefaultScheduler has been initiated. Use getInstance() instead."); } } @@ -175,14 +175,11 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti } } - public static DefaultScheduler getInstance() { - return INSTANCE; - } - @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) { try { + logger.info("ZK Connection state change to " + newState + ", shutdown default scheduler."); shutdown(); } catch (SchedulerException e) { throw new RuntimeException("failed to shutdown scheduler", e); @@ -190,23 +187,11 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti } } - public synchronized static DefaultScheduler createInstance() { - destroyInstance(); - INSTANCE = new DefaultScheduler(); - return INSTANCE; - } - - public synchronized static void destroyInstance() { - DefaultScheduler tmp = INSTANCE; - INSTANCE = null; - if (tmp != null) { - try { - tmp.shutdown(); - } catch (SchedulerException e) { - logger.error("error stop DefaultScheduler", e); - throw new RuntimeException(e); - } + public synchronized static DefaultScheduler getInstance() { + if (INSTANCE == null) { + INSTANCE = new DefaultScheduler(); } + return INSTANCE; } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/3a3d11a3/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java index e0a542e..6786d31 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java @@ -29,7 +29,7 @@ import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.lock.MockJobLock; import org.junit.After; -import org.junit.Before; +import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,16 +39,16 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { private static final Logger logger = LoggerFactory.getLogger(BaseSchedulerTest.class); - private DefaultScheduler scheduler; + private static DefaultScheduler scheduler; - protected ExecutableManager jobService; + protected static ExecutableManager jobService; - @Before - public void setup() throws Exception { + @BeforeClass + public static void setup() throws Exception { System.setProperty("kylin.job.scheduler.poll-interval-second", "1"); - createTestMetadata(); + staticCreateTestMetadata(); jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); - scheduler = DefaultScheduler.createInstance(); + scheduler = DefaultScheduler.getInstance(); scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new MockJobLock()); if (!scheduler.hasStarted()) { throw new RuntimeException("scheduler has not been started"); @@ -57,7 +57,6 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { @After public void after() throws Exception { - DefaultScheduler.destroyInstance(); cleanupTestMetadata(); System.clearProperty("kylin.job.scheduler.poll-interval-second"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/3a3d11a3/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index 18a07cf..a432902 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -156,7 +156,7 @@ public class BuildCubeWithEngine { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); jobService = ExecutableManager.getInstance(kylinConfig); - scheduler = DefaultScheduler.createInstance(); + scheduler = DefaultScheduler.getInstance(); scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); if (!scheduler.hasStarted()) { throw new RuntimeException("scheduler has not been started"); @@ -172,7 +172,6 @@ public class BuildCubeWithEngine { } public void after() { - DefaultScheduler.destroyInstance(); } public static void afterClass() { http://git-wip-us.apache.org/repos/asf/kylin/blob/3a3d11a3/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index bf52b97..60cea56 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -111,7 +111,7 @@ public class BuildCubeWithStream { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); jobService = ExecutableManager.getInstance(kylinConfig); - scheduler = DefaultScheduler.createInstance(); + scheduler = DefaultScheduler.getInstance(); scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); if (!scheduler.hasStarted()) { throw new RuntimeException("scheduler has not been started"); @@ -306,7 +306,6 @@ public class BuildCubeWithStream { public void after() { kafkaServer.stop(); cleanKafkaZkPath(kafkaZkPath); - DefaultScheduler.destroyInstance(); } private void cleanKafkaZkPath(String path) {