Repository: kylin
Updated Branches:
  refs/heads/master 83f1d248a -> 1ed861539


KYLIN-1717 Make job engine scheduler configurable

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1ed86153
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1ed86153
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1ed86153

Branch: refs/heads/master
Commit: 1ed8615394c0c6fe6f991e2a37d68343f90cafe8
Parents: 83f1d24
Author: shaofengshi <[email protected]>
Authored: Wed May 25 14:09:17 2016 +0800
Committer: shaofengshi <[email protected]>
Committed: Wed May 25 14:12:32 2016 +0800

----------------------------------------------------------------------
 build/bin/kylin.sh                              | 10 ++++
 build/conf/kylin.properties                     | 16 ++++++
 .../org/apache/kylin/common/KylinConfig.java    | 33 ++++++++++++
 .../apache/kylin/common/KylinConfigBase.java    | 26 ++++++++-
 .../kylin/common/util/ImplementationSwitch.java | 30 +++++++----
 .../java/org/apache/kylin/job/Scheduler.java    |  2 +
 .../org/apache/kylin/job/SchedulerFactory.java  | 26 +++++++++
 .../job/impl/threadpool/DefaultScheduler.java   | 57 ++++++++++++--------
 .../kylin/job/manager/ExecutableManager.java    | 15 ++++++
 .../job/impl/threadpool/BaseSchedulerTest.java  |  2 +-
 .../test_case_data/localmeta/kylin.properties   | 15 ++++++
 .../test_case_data/sandbox/kylin.properties     | 15 ++++++
 .../kylin/provision/BuildCubeWithEngine.java    |  2 +-
 .../kylin/provision/BuildCubeWithSpark.java     |  2 +-
 .../kylin/provision/BuildIIWithEngine.java      |  2 +-
 .../kylin/rest/controller/JobController.java    | 54 ++++++++++---------
 16 files changed, 245 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 0e503ee..42789be 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -73,6 +73,15 @@ then
     mkdir -p ${KYLIN_HOME}/ext
     export 
HBASE_CLASSPATH=$hive_dependency:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/ext/*:${HBASE_CLASSPATH}
 
+    if [ -z "$KYLIN_REST_ADDRESS" ]
+    then
+        kylin_rest_address=`hostname -f`":"`grep "<Connector port=" 
${tomcat_root}/conf/server.xml |grep protocol=\"HTTP/1.1\" | cut -d '=' -f 2 | 
cut -d \" -f 2`
+        echo "KYLIN_REST_ADDRESS not found, will use ${kylin_rest_address}"
+    else
+        echo "KYLIN_REST_ADDRESS is set to: $KYLIN_REST_ADDRESS"
+        kylin_rest_address=$KYLIN_REST_ADDRESS
+    fi
+
     #debug if encounter NoClassDefError
     #hbase classpath
 
@@ -88,6 +97,7 @@ then
     -Djava.io.tmpdir=${tomcat_root}/temp  \
     -Dkylin.hive.dependency=${hive_dependency} \
     -Dkylin.hbase.dependency=${hbase_dependency} \
+    -Dkylin.rest.address=${kylin_rest_address} \
     -Dspring.profiles.active=${spring_profile} \
     org.apache.hadoop.util.RunJar ${tomcat_root}/bin/bootstrap.jar  
org.apache.catalina.startup.Bootstrap start >> ${KYLIN_HOME}/logs/kylin.out 
2>&1 & echo $! > ${KYLIN_HOME}/pid &
     echo "A new Kylin instance is started by $USER, stop it using \"kylin.sh 
stop\""

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index df2c83e..39712c1 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -15,6 +15,22 @@
 # limitations under the License.
 #
 
+# job engines
+kylin.job.engine.0=org.apache.kylin.engine.mr.MRBatchCubingEngine
+kylin.job.engine.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2
+
+# source engines
+kylin.source.engine.0=org.apache.kylin.source.hive.HiveSource
+
+# storage engines
+kylin.storage.engine.0=org.apache.kylin.storage.hbase.HBaseStorage
+kylin.storage.engine.1=org.apache.kylin.storage.hybrid.HybridStorage
+kylin.storage.engine.2=org.apache.kylin.storage.hbase.HBaseStorage
+
+
+# schedulers
+kylin.scheduler.0=org.apache.kylin.job.impl.threadpool.DefaultScheduler
+
 # kylin server's mode
 kylin.server.mode=all
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 92a4661..43841a2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -326,4 +326,37 @@ public class KylinConfig extends KylinConfigBase {
         else
             return this.base() == ((KylinConfig) another).base();
     }
+
+
+    public static void writeOverrideProperties(Properties properties) throws 
IOException {
+        File propFile = getKylinProperties();
+        File overrideFile = new File(propFile.getParentFile(), 
propFile.getName() + ".override");
+        overrideFile.createNewFile();
+        FileInputStream fis2 = null;
+        Properties override = new Properties();
+        try {
+            fis2 = new FileInputStream(overrideFile);
+            override.load(fis2);
+            for (Map.Entry<Object, Object> entries : properties.entrySet()) {
+                override.setProperty(entries.getKey().toString(), 
entries.getValue().toString());
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            IOUtils.closeQuietly(fis2);
+        }
+
+        PrintWriter pw = null;
+        try {
+            pw = new PrintWriter(overrideFile);
+            for (Enumeration e = override.propertyNames(); 
e.hasMoreElements();) {
+                String key = (String) e.nextElement();
+                pw.println(key + "=" + override.getProperty(key));
+            }
+            pw.close();
+        } finally {
+            IOUtils.closeQuietly(pw);
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 0199378..f1496b5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -683,7 +683,31 @@ abstract public class KylinConfigBase implements 
Serializable {
         r.put(2, "org.apache.kylin.storage.hbase.HBaseStorage");
         return r;
     }
-    
+
+    public Map<Integer, String> getSchedulers() {
+        return convertKeyToInteger(getPropertiesByPrefix("kylin.scheduler."));
+    }
+
+    public Integer getSchedulerType() {
+        return Integer.parseInt(getOptional("kylin.enable.scheduler", "0"));
+    }
+
+
+    public String getZookeeperAddress() {
+        return this.getOptional("kylin.zookeeper.address");
+    }
+
+    public void setZookeeperAddress(String zkAddress) {
+        setProperty("kylin.zookeeper.address", zkAddress);
+    }
+
+    public String getRestAddress() {
+        return this.getOptional("kylin.rest.address", "localhost:7070");
+    }
+    public void setRestAddress(String restAddress) {
+        setProperty("kylin.rest.address", restAddress);
+    }
+
     private Map<Integer, String> convertKeyToInteger(Map<String, String> map) {
         Map<Integer, String> result = Maps.newLinkedHashMap();
         for (Entry<String, String> entry : map.entrySet()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
 
b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
index 4a47b83..873393c 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
@@ -19,6 +19,7 @@ package org.apache.kylin.common.util;
 
 import java.util.Map;
 
+import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,10 +33,12 @@ public class ImplementationSwitch<I> {
 
     final private Object[] instances;
     private Class<I> interfaceClz;
+    private Map<Integer, String> impls = Maps.newHashMap();
 
     public ImplementationSwitch(Map<Integer, String> impls, Class<I> 
interfaceClz) {
+        this.impls.putAll(impls);
         this.interfaceClz = interfaceClz;
-        this.instances = initInstances(impls);
+        this.instances = initInstances(this.impls);
     }
 
     private Object[] initInstances(Map<Integer, String> impls) {
@@ -48,22 +51,27 @@ public class ImplementationSwitch<I> {
 
         Object[] result = new Object[maxId + 1];
 
-        for (Integer id : impls.keySet()) {
-            String clzName = impls.get(id);
-            try {
-                result[id] = ClassUtil.newInstance(clzName);
-            } catch (Exception ex) {
-                logger.warn("Implementation missing " + clzName + " - " + ex);
-            }
-        }
-
         return result;
     }
 
-    public I get(int id) {
+    public synchronized I get(int id) {
+        String clzName = impls.get(id);
+        if (clzName == null) {
+            throw new IllegalArgumentException("Implementation class missing, 
ID " + id + ", interface " + interfaceClz.getName());
+        }
+
         @SuppressWarnings("unchecked")
         I result = (I) instances[id];
 
+        if (result == null) {
+            try {
+                result = (I)ClassUtil.newInstance(clzName);
+                instances[id] = result;
+            } catch (Exception ex) {
+                logger.warn("Implementation missing " + clzName + " - " + ex);
+            }
+        }
+
         if (result == null)
             throw new IllegalArgumentException("Implementations missing, ID " 
+ id + ", interface " + interfaceClz.getName());
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java 
b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
index 2ed2fc2..93d2510 100644
--- a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
@@ -33,4 +33,6 @@ public interface Scheduler<T extends Executable> {
 
     boolean stop(T executable) throws SchedulerException;
 
+    boolean hasStarted();
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java 
b/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java
new file mode 100644
index 0000000..0e8207f
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java
@@ -0,0 +1,26 @@
+package org.apache.kylin.job;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ImplementationSwitch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ */
+public class SchedulerFactory {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SchedulerFactory.class);
+    private static ImplementationSwitch<Scheduler> schedulers;
+
+    static {
+        Map<Integer, String> impls = 
KylinConfig.getInstanceFromEnv().getSchedulers();
+        schedulers = new ImplementationSwitch<Scheduler>(impls, 
Scheduler.class);
+    }
+
+    public static Scheduler scheduler(int schedulerType) {
+        return schedulers.get(schedulerType);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 5e11041..dd22234 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -55,14 +55,17 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
     private ExecutorService jobPool;
     private DefaultContext context;
 
-    private Logger logger = LoggerFactory.getLogger(DefaultScheduler.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(DefaultScheduler.class);
     private volatile boolean initialized = false;
     private volatile boolean hasStarted = false;
     private JobEngineConfig jobEngineConfig;
 
-    private static final DefaultScheduler INSTANCE = new DefaultScheduler();
+    private static DefaultScheduler INSTANCE = null;
 
-    private DefaultScheduler() {
+    public DefaultScheduler() {
+        if (INSTANCE != null) {
+            throw new IllegalStateException("DefaultScheduler has been 
initiated.");
+        }
     }
 
     private class FetcherRunner implements Runnable {
@@ -149,8 +152,35 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
         }
     }
 
+    public synchronized static DefaultScheduler createInstance() {
+        destroyInstance();
+        INSTANCE = new DefaultScheduler();
+        return INSTANCE;
+    }
+
+    public synchronized static void destroyInstance() {
+        DefaultScheduler tmp = INSTANCE;
+        INSTANCE = null;
+        if (tmp != null) {
+            try {
+                tmp.shutdown();
+            } catch (SchedulerException e) {
+                logger.error("error stop DefaultScheduler", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+
     @Override
     public synchronized void init(JobEngineConfig jobEngineConfig, final 
JobLock jobLock) throws SchedulerException {
+        String serverMode = jobEngineConfig.getConfig().getServerMode();
+        if (! ("job".equals(serverMode.toLowerCase()) || 
"all".equals(serverMode.toLowerCase()))) {
+            logger.info("server mode: " + serverMode + ", no need to run job 
scheduler");
+            return;
+        }
+        logger.info("Initializing Job Engine ....");
+
         if (!initialized) {
             initialized = true;
         } else {
@@ -170,25 +200,7 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
         jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 
Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
         context = new DefaultContext(Maps.<String, Executable> 
newConcurrentMap(), jobEngineConfig.getConfig());
 
-        for (AbstractExecutable executable : 
executableManager.getAllExecutables()) {
-            if (executable.getStatus() == ExecutableState.READY) {
-                executableManager.updateJobOutput(executable.getId(), 
ExecutableState.ERROR, null, "scheduler initializing work to reset job to ERROR 
status");
-            }
-        }
-        executableManager.updateAllRunningJobsToError();
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            public void run() {
-                logger.debug("Closing zk connection");
-                try {
-                    shutdown();
-                } catch (SchedulerException e) {
-                    logger.error("error shutdown scheduler", e);
-                } finally {
-                    jobLock.unlock();
-                }
-            }
-        });
+        executableManager.resumeAllRunningJobs();
 
         fetcher = new FetcherRunner();
         fetcherPool.scheduleAtFixedRate(fetcher, 10, 
ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
@@ -211,6 +223,7 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
         }
     }
 
+    @Override
     public boolean hasStarted() {
         return this.hasStarted;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java 
b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
index 45e0521..5138fb4 100644
--- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
@@ -223,6 +223,21 @@ public class ExecutableManager {
         }
     }
 
+    public void resumeAllRunningJobs() {
+        try {
+            final List<ExecutableOutputPO> jobOutputs = 
executableDao.getJobOutputs();
+            for (ExecutableOutputPO executableOutputPO : jobOutputs) {
+                if 
(executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString()))
 {
+                    
executableOutputPO.setStatus(ExecutableState.READY.toString());
+                    executableDao.updateJobOutput(executableOutputPO);
+                }
+            }
+        } catch (PersistentException e) {
+            logger.error("error reset job status from RUNNING to READY", e);
+            throw new RuntimeException(e);
+        }
+    }
+
     public void resumeJob(String jobId) {
         AbstractExecutable job = getJob(jobId);
         if (job == null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index ecac973..4e092a1 100644
--- 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -45,7 +45,7 @@ public abstract class BaseSchedulerTest extends 
LocalFileMetadataTestCase {
         createTestMetadata();
         
setFinalStatic(ExecutableConstants.class.getField("DEFAULT_SCHEDULER_INTERVAL_SECONDS"),
 10);
         jobService = 
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-        scheduler = DefaultScheduler.getInstance();
+        scheduler = DefaultScheduler.createInstance();
         scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), 
new MockJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties 
b/examples/test_case_data/localmeta/kylin.properties
index 9ac7625..9612180 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -15,6 +15,21 @@
 # limitations under the License.
 #
 
+# job engines
+kylin.job.engine.0=org.apache.kylin.engine.mr.MRBatchCubingEngine
+kylin.job.engine.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2
+
+# source engines
+kylin.source.engine.0=org.apache.kylin.source.hive.HiveSource
+
+# storage engines
+kylin.storage.engine.0=org.apache.kylin.storage.hbase.HBaseStorage
+kylin.storage.engine.1=org.apache.kylin.storage.hybrid.HybridStorage
+kylin.storage.engine.2=org.apache.kylin.storage.hbase.HBaseStorage
+
+# schedulers
+kylin.scheduler.0=org.apache.kylin.job.impl.threadpool.DefaultScheduler
+
 # optional information for the owner of kylin platform, it can be your team's 
email
 # currently it will be attached to each kylin's htable attribute
 [email protected]

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties 
b/examples/test_case_data/sandbox/kylin.properties
index 423d7de..16784ca 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -15,6 +15,21 @@
 # limitations under the License.
 #
 
+# job engines
+kylin.job.engine.0=org.apache.kylin.engine.mr.MRBatchCubingEngine
+kylin.job.engine.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2
+
+# source engines
+kylin.source.engine.0=org.apache.kylin.source.hive.HiveSource
+
+# storage engines
+kylin.storage.engine.0=org.apache.kylin.storage.hbase.HBaseStorage
+kylin.storage.engine.1=org.apache.kylin.storage.hybrid.HybridStorage
+kylin.storage.engine.2=org.apache.kylin.storage.hbase.HBaseStorage
+
+# schedulers
+kylin.scheduler.0=org.apache.kylin.job.impl.threadpool.DefaultScheduler
+
 # kylin server's mode
 kylin.server.mode=all
 # optional information for the owner of kylin platform, it can be your team's 
email

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 07de5d7..249bba6 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -137,7 +137,7 @@ public class BuildCubeWithEngine {
 
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         jobService = ExecutableManager.getInstance(kylinConfig);
-        scheduler = DefaultScheduler.getInstance();
+        scheduler = DefaultScheduler.createInstance();
         scheduler.init(new JobEngineConfig(kylinConfig), new 
ZookeeperJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
index d862dbf..402fa6c 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
@@ -100,7 +100,7 @@ public class BuildCubeWithSpark {
         for (String jobId : jobService.getAllJobIds()) {
             jobService.deleteJob(jobId);
         }
-        scheduler = DefaultScheduler.getInstance();
+        scheduler = DefaultScheduler.createInstance();
         scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
index 643b122..c76b15b 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
@@ -108,7 +108,7 @@ public class BuildIIWithEngine {
 
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         jobService = ExecutableManager.getInstance(kylinConfig);
-        scheduler = DefaultScheduler.getInstance();
+        scheduler = DefaultScheduler.createInstance();
         scheduler.init(new JobEngineConfig(kylinConfig), new 
ZookeeperJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");

http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java 
b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 5c835ac..1400c2e 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -28,12 +28,14 @@ import java.util.TimeZone;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.Scheduler;
+import org.apache.kylin.job.SchedulerFactory;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobTimeFilterEnum;
 import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.exception.SchedulerException;
+import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.lock.JobLock;
-import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.JobListRequest;
 import org.apache.kylin.rest.service.JobService;
@@ -48,9 +50,7 @@ import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.ResponseBody;
 
 /**
- * @author ysong1
- * @author Jack
- * 
+ *
  */
 @Controller
 @RequestMapping(value = "jobs")
@@ -77,27 +77,33 @@ public class JobController extends BasicController 
implements InitializingBean {
         TimeZone.setDefault(tzone);
 
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        String serverMode = kylinConfig.getServerMode();
-
-        if (Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) || 
Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase())) {
-            logger.info("Initializing Job Engine ....");
-
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        DefaultScheduler scheduler = 
DefaultScheduler.getInstance();
-                        scheduler.init(new JobEngineConfig(kylinConfig), 
jobLock);
-                        while (!scheduler.hasStarted()) {
-                            logger.error("scheduler has not been started");
-                            Thread.sleep(1000);
-                        }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
+        final Scheduler<AbstractExecutable> scheduler = 
(Scheduler<AbstractExecutable>)SchedulerFactory.scheduler(kylinConfig.getSchedulerType());
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    scheduler.init(new JobEngineConfig(kylinConfig), jobLock);
+                    while (!scheduler.hasStarted()) {
+                        logger.error("scheduler has not been started");
+                        Thread.sleep(1000);
                     }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
                 }
-            }).start();
-        }
+            }
+        }).start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    scheduler.shutdown();
+                } catch (SchedulerException e) {
+                    logger.error("error occurred to shutdown scheduler", e);
+                }
+            }
+        }));
     }
 
     /**

Reply via email to