KYLIN-1189 resume running jobs when job engine failover
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/362e1559 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/362e1559 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/362e1559 Branch: refs/heads/helix-rebase Commit: 362e155982b8139d03a0d3735e82cde460555189 Parents: 41efb68 Author: shaofengshi <shaofeng...@apache.org> Authored: Thu Dec 3 10:32:40 2015 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Mar 2 17:24:10 2016 +0800 ---------------------------------------------------------------------- .../job/impl/threadpool/DefaultScheduler.java | 7 +----- .../kylin/job/manager/ExecutableManager.java | 16 +++++++++++++ .../kylin/rest/service/CacheServiceTest.java | 25 ++++++++++++++++++++ 3 files changed, 42 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/362e1559/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 417e279..2915c60 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -170,12 +170,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>()); context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig()); - for (AbstractExecutable executable : executableManager.getAllExecutables()) { - if (executable.getStatus() == ExecutableState.READY) { - executableManager.updateJobOutput(executable.getId(), ExecutableState.ERROR, null, "scheduler initializing work to reset job to ERROR status"); - } - } - executableManager.updateAllRunningJobsToError(); + executableManager.resumeAllRunningJobs(); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { http://git-wip-us.apache.org/repos/asf/kylin/blob/362e1559/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java index 3effbe7..4d03389 100644 --- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java @@ -207,6 +207,7 @@ public class ExecutableManager { } } + @Deprecated public void updateAllRunningJobsToError() { try { final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(); @@ -222,6 +223,21 @@ public class ExecutableManager { } } + public void resumeAllRunningJobs() { + try { + final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(); + for (ExecutableOutputPO executableOutputPO : jobOutputs) { + if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) { + executableOutputPO.setStatus(ExecutableState.READY.toString()); + executableDao.updateJobOutput(executableOutputPO); + } + } + } catch (PersistentException e) { + logger.error("error reset job status from RUNNING to READY", e); + throw new RuntimeException(e); + } + } + public void resumeJob(String jobId) { AbstractExecutable job = getJob(jobId); if (job == null) { http://git-wip-us.apache.org/repos/asf/kylin/blob/362e1559/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java index 25b131a..4449d2b 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java @@ -20,11 +20,15 @@ package org.apache.kylin.rest.service; import static org.junit.Assert.*; +import java.io.File; import java.util.Arrays; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import org.I0Itec.zkclient.IDefaultNameSpace; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkServer; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.common.util.LocalFileMetadataTestCase; @@ -60,6 +64,8 @@ public class CacheServiceTest extends LocalFileMetadataTestCase { private static Server server; + private static String ZK_ADDRESS = "localhost:2199"; + private static KylinConfig configA; private static KylinConfig configB; @@ -70,10 +76,13 @@ public class CacheServiceTest extends LocalFileMetadataTestCase { @BeforeClass public static void beforeClass() throws Exception { staticCreateTestMetadata(); + startZookeeper(); configA = KylinConfig.getInstanceFromEnv(); configA.setProperty("kylin.rest.servers", "localhost:7070"); + configA.setProperty("kylin.zookeeper.address", ZK_ADDRESS); configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam()); configB.setProperty("kylin.rest.servers", "localhost:7070"); + configB.setProperty("kylin.zookeeper.address", ZK_ADDRESS); configB.setMetadataUrl("../examples/test_metadata"); server = new Server(7070); @@ -356,4 +365,20 @@ public class CacheServiceTest extends LocalFileMetadataTestCase { } return false; } + + + public static void startZookeeper() { + logger.info("STARTING Zookeeper at " + ZK_ADDRESS); + IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() { + @Override + public void createDefaultNameSpace(ZkClient zkClient) { + } + }; + new File("/tmp/helix-quickstart").mkdirs(); + // start zookeeper + ZkServer server = + new ZkServer("/tmp/helix-quickstart/dataDir", "/tmp/helix-quickstart/logDir", + defaultNameSpace, 2199); + server.start(); + } }