Fix job status not changed to error after HBase Region Server is shutdown
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1e359901 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1e359901 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1e359901 Branch: refs/heads/master Commit: 1e35990184815c3ed0f655542d8c9af118560dae Parents: 4629d84 Author: nichunen <chunen...@kyligence.io> Authored: Tue Dec 12 22:21:10 2017 +0800 Committer: Ni Chunen <chunen...@kyligence.io> Committed: Tue Dec 12 22:43:24 2017 +0800 ---------------------------------------------------------------------- .../job/impl/threadpool/DefaultScheduler.java | 11 +++++- .../kylin/job/NoErrorStatusExecutable.java | 37 ++++++++++++++++++++ .../job/impl/threadpool/BaseSchedulerTest.java | 16 +++++++++ .../impl/threadpool/DefaultSchedulerTest.java | 14 ++++++++ 4 files changed, 77 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/1e359901/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index b87a839..f5360da 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -59,6 +59,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti private static final Logger logger = LoggerFactory.getLogger(DefaultScheduler.class); private volatile boolean initialized = false; private volatile boolean hasStarted = false; + volatile boolean fetchFailed = false; private JobEngineConfig jobEngineConfig; private static DefaultScheduler INSTANCE = null; @@ -102,7 +103,12 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti } else if (output.getState() == ExecutableState.STOPPED) { nStopped++; } else { - nOthers++; + if (fetchFailed) { + executableManager.updateJobOutput(id, ExecutableState.ERROR, null, null); + nError++; + } else { + nOthers++; + } } continue; } @@ -122,10 +128,13 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti logger.warn(jobDesc + " fail to schedule", ex); } } + + fetchFailed = false; logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, " + nStopped + " stopped, " + nReady + " ready, " + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers + " others"); } catch (Exception e) { + fetchFailed = true; logger.warn("Job Fetcher caught a exception ", e); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1e359901/core-job/src/test/java/org/apache/kylin/job/NoErrorStatusExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/NoErrorStatusExecutable.java b/core-job/src/test/java/org/apache/kylin/job/NoErrorStatusExecutable.java new file mode 100644 index 0000000..a203eb7 --- /dev/null +++ b/core-job/src/test/java/org/apache/kylin/job/NoErrorStatusExecutable.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableContext; + +/** + */ +public class NoErrorStatusExecutable extends DefaultChainedExecutable { + + public NoErrorStatusExecutable() { + super(); + } + + @Override + protected void onExecuteError(Throwable exception, ExecutableContext executableContext) { + return; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1e359901/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java index 39c5f29..e0a542e 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java @@ -114,4 +114,20 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { } } } + + protected void runningJobToError(String jobId) { + while (true) { + try { + AbstractExecutable job = jobService.getJob(jobId); + ExecutableState status = job.getStatus(); + if (status == ExecutableState.RUNNING) { + scheduler.fetchFailed = true; + break; + } + Thread.sleep(1000); + } catch (Exception ex) { + logger.error("", ex); + } + } + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1e359901/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java index c8b251d..5c51f59 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kylin.job.BaseTestExecutable; import org.apache.kylin.job.ErrorTestExecutable; import org.apache.kylin.job.FailedTestExecutable; +import org.apache.kylin.job.NoErrorStatusExecutable; import org.apache.kylin.job.SelfStopExecutable; import org.apache.kylin.job.SucceedTestExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; @@ -145,4 +146,17 @@ public class DefaultSchedulerTest extends BaseSchedulerTest { assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(7, TimeUnit.SECONDS)); assertFalse("future2 should has been stopped", future2.cancel(true)); } + + @Test + public void tesMetaStoreRecover() throws Exception { + logger.info("tesMetaStoreRecover"); + NoErrorStatusExecutable job = new NoErrorStatusExecutable(); + ErrorTestExecutable task = new ErrorTestExecutable(); + job.addTask(task); + jobService.addJob(job); + Thread.sleep(2000); + runningJobToError(job.getId()); + Thread.sleep(2000); + Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState()); + } }