Repository: kylin Updated Branches: refs/heads/master 83f1d248a -> 1ed861539
KYLIN-1717 Make job engine scheduler configurable Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1ed86153 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1ed86153 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1ed86153 Branch: refs/heads/master Commit: 1ed8615394c0c6fe6f991e2a37d68343f90cafe8 Parents: 83f1d24 Author: shaofengshi <[email protected]> Authored: Wed May 25 14:09:17 2016 +0800 Committer: shaofengshi <[email protected]> Committed: Wed May 25 14:12:32 2016 +0800 ---------------------------------------------------------------------- build/bin/kylin.sh | 10 ++++ build/conf/kylin.properties | 16 ++++++ .../org/apache/kylin/common/KylinConfig.java | 33 ++++++++++++ .../apache/kylin/common/KylinConfigBase.java | 26 ++++++++- .../kylin/common/util/ImplementationSwitch.java | 30 +++++++---- .../java/org/apache/kylin/job/Scheduler.java | 2 + .../org/apache/kylin/job/SchedulerFactory.java | 26 +++++++++ .../job/impl/threadpool/DefaultScheduler.java | 57 ++++++++++++-------- .../kylin/job/manager/ExecutableManager.java | 15 ++++++ .../job/impl/threadpool/BaseSchedulerTest.java | 2 +- .../test_case_data/localmeta/kylin.properties | 15 ++++++ .../test_case_data/sandbox/kylin.properties | 15 ++++++ .../kylin/provision/BuildCubeWithEngine.java | 2 +- .../kylin/provision/BuildCubeWithSpark.java | 2 +- .../kylin/provision/BuildIIWithEngine.java | 2 +- .../kylin/rest/controller/JobController.java | 54 ++++++++++--------- 16 files changed, 245 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/build/bin/kylin.sh ---------------------------------------------------------------------- diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh index 0e503ee..42789be 100644 --- a/build/bin/kylin.sh +++ b/build/bin/kylin.sh @@ -73,6 +73,15 @@ then mkdir -p ${KYLIN_HOME}/ext export HBASE_CLASSPATH=$hive_dependency:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/ext/*:${HBASE_CLASSPATH} + if [ -z "$KYLIN_REST_ADDRESS" ] + then + kylin_rest_address=`hostname -f`":"`grep "<Connector port=" ${tomcat_root}/conf/server.xml |grep protocol=\"HTTP/1.1\" | cut -d '=' -f 2 | cut -d \" -f 2` + echo "KYLIN_REST_ADDRESS not found, will use ${kylin_rest_address}" + else + echo "KYLIN_REST_ADDRESS is set to: $KYLIN_REST_ADDRESS" + kylin_rest_address=$KYLIN_REST_ADDRESS + fi + #debug if encounter NoClassDefError #hbase classpath @@ -88,6 +97,7 @@ then -Djava.io.tmpdir=${tomcat_root}/temp \ -Dkylin.hive.dependency=${hive_dependency} \ -Dkylin.hbase.dependency=${hbase_dependency} \ + -Dkylin.rest.address=${kylin_rest_address} \ -Dspring.profiles.active=${spring_profile} \ org.apache.hadoop.util.RunJar ${tomcat_root}/bin/bootstrap.jar org.apache.catalina.startup.Bootstrap start >> ${KYLIN_HOME}/logs/kylin.out 2>&1 & echo $! > ${KYLIN_HOME}/pid & echo "A new Kylin instance is started by $USER, stop it using \"kylin.sh stop\"" http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/build/conf/kylin.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index df2c83e..39712c1 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -15,6 +15,22 @@ # limitations under the License. # +# job engines +kylin.job.engine.0=org.apache.kylin.engine.mr.MRBatchCubingEngine +kylin.job.engine.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2 + +# source engines +kylin.source.engine.0=org.apache.kylin.source.hive.HiveSource + +# storage engines +kylin.storage.engine.0=org.apache.kylin.storage.hbase.HBaseStorage +kylin.storage.engine.1=org.apache.kylin.storage.hybrid.HybridStorage +kylin.storage.engine.2=org.apache.kylin.storage.hbase.HBaseStorage + + +# schedulers +kylin.scheduler.0=org.apache.kylin.job.impl.threadpool.DefaultScheduler + # kylin server's mode kylin.server.mode=all http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index 92a4661..43841a2 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -326,4 +326,37 @@ public class KylinConfig extends KylinConfigBase { else return this.base() == ((KylinConfig) another).base(); } + + + public static void writeOverrideProperties(Properties properties) throws IOException { + File propFile = getKylinProperties(); + File overrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override"); + overrideFile.createNewFile(); + FileInputStream fis2 = null; + Properties override = new Properties(); + try { + fis2 = new FileInputStream(overrideFile); + override.load(fis2); + for (Map.Entry<Object, Object> entries : properties.entrySet()) { + override.setProperty(entries.getKey().toString(), entries.getValue().toString()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + IOUtils.closeQuietly(fis2); + } + + PrintWriter pw = null; + try { + pw = new PrintWriter(overrideFile); + for (Enumeration e = override.propertyNames(); e.hasMoreElements();) { + String key = (String) e.nextElement(); + pw.println(key + "=" + override.getProperty(key)); + } + pw.close(); + } finally { + IOUtils.closeQuietly(pw); + } + + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 0199378..f1496b5 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -683,7 +683,31 @@ abstract public class KylinConfigBase implements Serializable { r.put(2, "org.apache.kylin.storage.hbase.HBaseStorage"); return r; } - + + public Map<Integer, String> getSchedulers() { + return convertKeyToInteger(getPropertiesByPrefix("kylin.scheduler.")); + } + + public Integer getSchedulerType() { + return Integer.parseInt(getOptional("kylin.enable.scheduler", "0")); + } + + + public String getZookeeperAddress() { + return this.getOptional("kylin.zookeeper.address"); + } + + public void setZookeeperAddress(String zkAddress) { + setProperty("kylin.zookeeper.address", zkAddress); + } + + public String getRestAddress() { + return this.getOptional("kylin.rest.address", "localhost:7070"); + } + public void setRestAddress(String restAddress) { + setProperty("kylin.rest.address", restAddress); + } + private Map<Integer, String> convertKeyToInteger(Map<String, String> map) { Map<Integer, String> result = Maps.newLinkedHashMap(); for (Entry<String, String> entry : map.entrySet()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java index 4a47b83..873393c 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java @@ -19,6 +19,7 @@ package org.apache.kylin.common.util; import java.util.Map; +import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,10 +33,12 @@ public class ImplementationSwitch<I> { final private Object[] instances; private Class<I> interfaceClz; + private Map<Integer, String> impls = Maps.newHashMap(); public ImplementationSwitch(Map<Integer, String> impls, Class<I> interfaceClz) { + this.impls.putAll(impls); this.interfaceClz = interfaceClz; - this.instances = initInstances(impls); + this.instances = initInstances(this.impls); } private Object[] initInstances(Map<Integer, String> impls) { @@ -48,22 +51,27 @@ public class ImplementationSwitch<I> { Object[] result = new Object[maxId + 1]; - for (Integer id : impls.keySet()) { - String clzName = impls.get(id); - try { - result[id] = ClassUtil.newInstance(clzName); - } catch (Exception ex) { - logger.warn("Implementation missing " + clzName + " - " + ex); - } - } - return result; } - public I get(int id) { + public synchronized I get(int id) { + String clzName = impls.get(id); + if (clzName == null) { + throw new IllegalArgumentException("Implementation class missing, ID " + id + ", interface " + interfaceClz.getName()); + } + @SuppressWarnings("unchecked") I result = (I) instances[id]; + if (result == null) { + try { + result = (I)ClassUtil.newInstance(clzName); + instances[id] = result; + } catch (Exception ex) { + logger.warn("Implementation missing " + clzName + " - " + ex); + } + } + if (result == null) throw new IllegalArgumentException("Implementations missing, ID " + id + ", interface " + interfaceClz.getName()); http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-job/src/main/java/org/apache/kylin/job/Scheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java index 2ed2fc2..93d2510 100644 --- a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java @@ -33,4 +33,6 @@ public interface Scheduler<T extends Executable> { boolean stop(T executable) throws SchedulerException; + boolean hasStarted(); + } http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java b/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java new file mode 100644 index 0000000..0e8207f --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java @@ -0,0 +1,26 @@ +package org.apache.kylin.job; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ImplementationSwitch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + */ +public class SchedulerFactory { + + private static final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class); + private static ImplementationSwitch<Scheduler> schedulers; + + static { + Map<Integer, String> impls = KylinConfig.getInstanceFromEnv().getSchedulers(); + schedulers = new ImplementationSwitch<Scheduler>(impls, Scheduler.class); + } + + public static Scheduler scheduler(int schedulerType) { + return schedulers.get(schedulerType); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 5e11041..dd22234 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -55,14 +55,17 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti private ExecutorService jobPool; private DefaultContext context; - private Logger logger = LoggerFactory.getLogger(DefaultScheduler.class); + private static final Logger logger = LoggerFactory.getLogger(DefaultScheduler.class); private volatile boolean initialized = false; private volatile boolean hasStarted = false; private JobEngineConfig jobEngineConfig; - private static final DefaultScheduler INSTANCE = new DefaultScheduler(); + private static DefaultScheduler INSTANCE = null; - private DefaultScheduler() { + public DefaultScheduler() { + if (INSTANCE != null) { + throw new IllegalStateException("DefaultScheduler has been initiated."); + } } private class FetcherRunner implements Runnable { @@ -149,8 +152,35 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti } } + public synchronized static DefaultScheduler createInstance() { + destroyInstance(); + INSTANCE = new DefaultScheduler(); + return INSTANCE; + } + + public synchronized static void destroyInstance() { + DefaultScheduler tmp = INSTANCE; + INSTANCE = null; + if (tmp != null) { + try { + tmp.shutdown(); + } catch (SchedulerException e) { + logger.error("error stop DefaultScheduler", e); + throw new RuntimeException(e); + } + } + } + + @Override public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException { + String serverMode = jobEngineConfig.getConfig().getServerMode(); + if (! ("job".equals(serverMode.toLowerCase()) || "all".equals(serverMode.toLowerCase()))) { + logger.info("server mode: " + serverMode + ", no need to run job scheduler"); + return; + } + logger.info("Initializing Job Engine ...."); + if (!initialized) { initialized = true; } else { @@ -170,25 +200,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>()); context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig()); - for (AbstractExecutable executable : executableManager.getAllExecutables()) { - if (executable.getStatus() == ExecutableState.READY) { - executableManager.updateJobOutput(executable.getId(), ExecutableState.ERROR, null, "scheduler initializing work to reset job to ERROR status"); - } - } - executableManager.updateAllRunningJobsToError(); - - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - logger.debug("Closing zk connection"); - try { - shutdown(); - } catch (SchedulerException e) { - logger.error("error shutdown scheduler", e); - } finally { - jobLock.unlock(); - } - } - }); + executableManager.resumeAllRunningJobs(); fetcher = new FetcherRunner(); fetcherPool.scheduleAtFixedRate(fetcher, 10, ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS); @@ -211,6 +223,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti } } + @Override public boolean hasStarted() { return this.hasStarted; } http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java index 45e0521..5138fb4 100644 --- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java @@ -223,6 +223,21 @@ public class ExecutableManager { } } + public void resumeAllRunningJobs() { + try { + final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(); + for (ExecutableOutputPO executableOutputPO : jobOutputs) { + if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) { + executableOutputPO.setStatus(ExecutableState.READY.toString()); + executableDao.updateJobOutput(executableOutputPO); + } + } + } catch (PersistentException e) { + logger.error("error reset job status from RUNNING to READY", e); + throw new RuntimeException(e); + } + } + public void resumeJob(String jobId) { AbstractExecutable job = getJob(jobId); if (job == null) { http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java index ecac973..4e092a1 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java @@ -45,7 +45,7 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { createTestMetadata(); setFinalStatic(ExecutableConstants.class.getField("DEFAULT_SCHEDULER_INTERVAL_SECONDS"), 10); jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); - scheduler = DefaultScheduler.getInstance(); + scheduler = DefaultScheduler.createInstance(); scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new MockJobLock()); if (!scheduler.hasStarted()) { throw new RuntimeException("scheduler has not been started"); http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/examples/test_case_data/localmeta/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties index 9ac7625..9612180 100644 --- a/examples/test_case_data/localmeta/kylin.properties +++ b/examples/test_case_data/localmeta/kylin.properties @@ -15,6 +15,21 @@ # limitations under the License. # +# job engines +kylin.job.engine.0=org.apache.kylin.engine.mr.MRBatchCubingEngine +kylin.job.engine.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2 + +# source engines +kylin.source.engine.0=org.apache.kylin.source.hive.HiveSource + +# storage engines +kylin.storage.engine.0=org.apache.kylin.storage.hbase.HBaseStorage +kylin.storage.engine.1=org.apache.kylin.storage.hybrid.HybridStorage +kylin.storage.engine.2=org.apache.kylin.storage.hbase.HBaseStorage + +# schedulers +kylin.scheduler.0=org.apache.kylin.job.impl.threadpool.DefaultScheduler + # optional information for the owner of kylin platform, it can be your team's email # currently it will be attached to each kylin's htable attribute [email protected] http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 423d7de..16784ca 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -15,6 +15,21 @@ # limitations under the License. # +# job engines +kylin.job.engine.0=org.apache.kylin.engine.mr.MRBatchCubingEngine +kylin.job.engine.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2 + +# source engines +kylin.source.engine.0=org.apache.kylin.source.hive.HiveSource + +# storage engines +kylin.storage.engine.0=org.apache.kylin.storage.hbase.HBaseStorage +kylin.storage.engine.1=org.apache.kylin.storage.hybrid.HybridStorage +kylin.storage.engine.2=org.apache.kylin.storage.hbase.HBaseStorage + +# schedulers +kylin.scheduler.0=org.apache.kylin.job.impl.threadpool.DefaultScheduler + # kylin server's mode kylin.server.mode=all # optional information for the owner of kylin platform, it can be your team's email http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index 07de5d7..249bba6 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -137,7 +137,7 @@ public class BuildCubeWithEngine { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); jobService = ExecutableManager.getInstance(kylinConfig); - scheduler = DefaultScheduler.getInstance(); + scheduler = DefaultScheduler.createInstance(); scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); if (!scheduler.hasStarted()) { throw new RuntimeException("scheduler has not been started"); http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java index d862dbf..402fa6c 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java @@ -100,7 +100,7 @@ public class BuildCubeWithSpark { for (String jobId : jobService.getAllJobIds()) { jobService.deleteJob(jobId); } - scheduler = DefaultScheduler.getInstance(); + scheduler = DefaultScheduler.createInstance(); scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock()); if (!scheduler.hasStarted()) { throw new RuntimeException("scheduler has not been started"); http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java index 643b122..c76b15b 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java @@ -108,7 +108,7 @@ public class BuildIIWithEngine { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); jobService = ExecutableManager.getInstance(kylinConfig); - scheduler = DefaultScheduler.getInstance(); + scheduler = DefaultScheduler.createInstance(); scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); if (!scheduler.hasStarted()) { throw new RuntimeException("scheduler has not been started"); http://git-wip-us.apache.org/repos/asf/kylin/blob/1ed86153/server/src/main/java/org/apache/kylin/rest/controller/JobController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java index 5c835ac..1400c2e 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java @@ -28,12 +28,14 @@ import java.util.TimeZone; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.Scheduler; +import org.apache.kylin.job.SchedulerFactory; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.constant.JobTimeFilterEnum; import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.impl.threadpool.DefaultScheduler; +import org.apache.kylin.job.exception.SchedulerException; +import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.lock.JobLock; -import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.request.JobListRequest; import org.apache.kylin.rest.service.JobService; @@ -48,9 +50,7 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; /** - * @author ysong1 - * @author Jack - * + * */ @Controller @RequestMapping(value = "jobs") @@ -77,27 +77,33 @@ public class JobController extends BasicController implements InitializingBean { TimeZone.setDefault(tzone); final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - String serverMode = kylinConfig.getServerMode(); - - if (Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase())) { - logger.info("Initializing Job Engine ...."); - - new Thread(new Runnable() { - @Override - public void run() { - try { - DefaultScheduler scheduler = DefaultScheduler.getInstance(); - scheduler.init(new JobEngineConfig(kylinConfig), jobLock); - while (!scheduler.hasStarted()) { - logger.error("scheduler has not been started"); - Thread.sleep(1000); - } - } catch (Exception e) { - throw new RuntimeException(e); + final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>)SchedulerFactory.scheduler(kylinConfig.getSchedulerType()); + + new Thread(new Runnable() { + @Override + public void run() { + try { + scheduler.init(new JobEngineConfig(kylinConfig), jobLock); + while (!scheduler.hasStarted()) { + logger.error("scheduler has not been started"); + Thread.sleep(1000); } + } catch (Exception e) { + throw new RuntimeException(e); } - }).start(); - } + } + }).start(); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + try { + scheduler.shutdown(); + } catch (SchedulerException e) { + logger.error("error occurred to shutdown scheduler", e); + } + } + })); } /**
