minor, change the way that get DefaultScheduler's instance.

minor, refine log when lost ZK connection.

minor, remove DefaultScheduler.destroyInstance().


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3a3d11a3
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3a3d11a3
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3a3d11a3

Branch: refs/heads/master
Commit: 3a3d11a319024fb7eccf26eb63f7c41891d66080
Parents: 9ff03c8
Author: tttMelody <245915...@qq.com>
Authored: Wed Dec 13 14:44:52 2017 +0800
Committer: Jiatao Tao <245915...@qq.com>
Committed: Thu Dec 14 18:00:18 2017 +0800

----------------------------------------------------------------------
 .../job/impl/threadpool/DefaultScheduler.java   | 29 +++++---------------
 .../job/impl/threadpool/BaseSchedulerTest.java  | 15 +++++-----
 .../kylin/provision/BuildCubeWithEngine.java    |  3 +-
 .../kylin/provision/BuildCubeWithStream.java    |  3 +-
 4 files changed, 16 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3a3d11a3/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index ec5f552..327a793 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -62,11 +62,11 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
     volatile boolean fetchFailed = false;
     private JobEngineConfig jobEngineConfig;
 
-    private static DefaultScheduler INSTANCE = null;
+    private static volatile DefaultScheduler INSTANCE = null;
 
     public DefaultScheduler() {
         if (INSTANCE != null) {
-            throw new IllegalStateException("DefaultScheduler has been 
initiated.");
+            throw new IllegalStateException("DefaultScheduler has been 
initiated. Use getInstance() instead.");
         }
     }
 
@@ -175,14 +175,11 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
         }
     }
 
-    public static DefaultScheduler getInstance() {
-        return INSTANCE;
-    }
-
     @Override
     public void stateChanged(CuratorFramework client, ConnectionState 
newState) {
         if ((newState == ConnectionState.SUSPENDED) || (newState == 
ConnectionState.LOST)) {
             try {
+                logger.info("ZK Connection state change to " + newState + ", 
shutdown default scheduler.");
                 shutdown();
             } catch (SchedulerException e) {
                 throw new RuntimeException("failed to shutdown scheduler", e);
@@ -190,23 +187,11 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
         }
     }
 
-    public synchronized static DefaultScheduler createInstance() {
-        destroyInstance();
-        INSTANCE = new DefaultScheduler();
-        return INSTANCE;
-    }
-
-    public synchronized static void destroyInstance() {
-        DefaultScheduler tmp = INSTANCE;
-        INSTANCE = null;
-        if (tmp != null) {
-            try {
-                tmp.shutdown();
-            } catch (SchedulerException e) {
-                logger.error("error stop DefaultScheduler", e);
-                throw new RuntimeException(e);
-            }
+    public synchronized static DefaultScheduler getInstance() {
+        if (INSTANCE == null) {
+            INSTANCE = new DefaultScheduler();
         }
+        return INSTANCE;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/3a3d11a3/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index e0a542e..6786d31 100644
--- 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -29,7 +29,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.lock.MockJobLock;
 import org.junit.After;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,16 +39,16 @@ public abstract class BaseSchedulerTest extends 
LocalFileMetadataTestCase {
 
     private static final Logger logger = 
LoggerFactory.getLogger(BaseSchedulerTest.class);
     
-    private DefaultScheduler scheduler;
+    private static DefaultScheduler scheduler;
 
-    protected ExecutableManager jobService;
+    protected static ExecutableManager jobService;
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeClass
+    public static void setup() throws Exception {
         System.setProperty("kylin.job.scheduler.poll-interval-second", "1");
-        createTestMetadata();
+        staticCreateTestMetadata();
         jobService = 
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-        scheduler = DefaultScheduler.createInstance();
+        scheduler = DefaultScheduler.getInstance();
         scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), 
new MockJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");
@@ -57,7 +57,6 @@ public abstract class BaseSchedulerTest extends 
LocalFileMetadataTestCase {
 
     @After
     public void after() throws Exception {
-        DefaultScheduler.destroyInstance();
         cleanupTestMetadata();
         System.clearProperty("kylin.job.scheduler.poll-interval-second");
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3a3d11a3/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 18a07cf..a432902 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -156,7 +156,7 @@ public class BuildCubeWithEngine {
 
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         jobService = ExecutableManager.getInstance(kylinConfig);
-        scheduler = DefaultScheduler.createInstance();
+        scheduler = DefaultScheduler.getInstance();
         scheduler.init(new JobEngineConfig(kylinConfig), new 
ZookeeperJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");
@@ -172,7 +172,6 @@ public class BuildCubeWithEngine {
     }
 
     public void after() {
-        DefaultScheduler.destroyInstance();
     }
 
     public static void afterClass() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/3a3d11a3/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index bf52b97..60cea56 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -111,7 +111,7 @@ public class BuildCubeWithStream {
 
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         jobService = ExecutableManager.getInstance(kylinConfig);
-        scheduler = DefaultScheduler.createInstance();
+        scheduler = DefaultScheduler.getInstance();
         scheduler.init(new JobEngineConfig(kylinConfig), new 
ZookeeperJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");
@@ -306,7 +306,6 @@ public class BuildCubeWithStream {
     public void after() {
         kafkaServer.stop();
         cleanKafkaZkPath(kafkaZkPath);
-        DefaultScheduler.destroyInstance();
     }
 
     private void cleanKafkaZkPath(String path) {

Reply via email to