KYLIN-3136, enhance protection when job's status is illegal
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a6d1ab02 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a6d1ab02 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a6d1ab02 Branch: refs/heads/master Commit: a6d1ab02234bcc021faed77fac17d74d3c1c7e30 Parents: 4eb6cd4 Author: Cheng Wang <cheng.w...@kyligence.io> Authored: Thu Dec 28 18:52:52 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Thu Dec 28 21:34:52 2017 -0600 ---------------------------------------------------------------------- .../job/execution/DefaultChainedExecutable.java | 52 ++++++++++++++++---- .../apache/kylin/job/RunningTestExecutable.java | 39 +++++++++++++++ .../job/impl/threadpool/BaseSchedulerTest.java | 3 -- .../impl/threadpool/DefaultSchedulerTest.java | 20 ++++++++ 4 files changed, 102 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/a6d1ab02/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index cbd49ae..8795e4c 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -43,7 +43,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai sub.initConfig(config); } } - + @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { List<? extends Executable> executables = getTasks(); @@ -58,7 +58,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai // the job is paused break; } else if (state == ExecutableState.ERROR) { - throw new IllegalStateException("invalid subtask state, subtask:" + subTask.getName() + ", state:" + subTask.getStatus()); + throw new IllegalStateException( + "invalid subtask state, subtask:" + subTask.getName() + ", state:" + subTask.getStatus()); } if (subTask.isRunnable()) { return subTask.execute(context); @@ -88,7 +89,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai @Override protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) { ExecutableManager mgr = getManager(); - + if (isDiscarded()) { setEndTime(System.currentTimeMillis()); notifyUserStatusChange(executableContext, ExecutableState.DISCARDED); @@ -99,19 +100,26 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai List<? extends Executable> jobs = getTasks(); boolean allSucceed = true; boolean hasError = false; - boolean hasRunning = false; boolean hasDiscarded = false; for (Executable task : jobs) { + if (task.getStatus() == ExecutableState.RUNNING) { + logger.error( + "There shouldn't be a running subtask[jobId: {}, jobName: {}], \n" + + "it might cause endless state, will retry to fetch subtask's state.", + task.getId(), task.getName()); + boolean retryRet = retryFetchTaskStatus(task); + if (false == retryRet) + hasError = true; + } + final ExecutableState status = task.getStatus(); + if (status == ExecutableState.ERROR) { hasError = true; } if (status != ExecutableState.SUCCEED) { allSucceed = false; } - if (status == ExecutableState.RUNNING) { - hasRunning = true; - } if (status == ExecutableState.DISCARDED) { hasDiscarded = true; } @@ -124,8 +132,6 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai setEndTime(System.currentTimeMillis()); mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, null); notifyUserStatusChange(executableContext, ExecutableState.ERROR); - } else if (hasRunning) { - mgr.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); } else if (hasDiscarded) { setEndTime(System.currentTimeMillis()); mgr.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null); @@ -163,4 +169,32 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai executable.setId(getId() + "-" + String.format("%02d", subTasks.size())); this.subTasks.add(executable); } + + private boolean retryFetchTaskStatus(Executable task) { + boolean hasRunning = false; + int retry = 1; + while (retry <= 10) { + ExecutableState retryState = task.getStatus(); + if (retryState == ExecutableState.RUNNING) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + logger.error("Failed to Sleep: ", e); + } + hasRunning = true; + logger.error("With {} times retry, it's state is still RUNNING", retry); + } else { + logger.info("With {} times retry, status is changed to: {}", retry, retryState); + hasRunning = false; + break; + } + retry++; + } + if (hasRunning) { + logger.error("Parent task: {} is finished, but it's subtask: {}'s state is still RUNNING \n" + + ", mark parent task failed.", getName(), task.getName()); + return false; + } + return true; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/a6d1ab02/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java new file mode 100644 index 0000000..89057e6 --- /dev/null +++ b/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; + +public class RunningTestExecutable extends BaseTestExecutable { + + public RunningTestExecutable() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/a6d1ab02/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java index deaa425..d7201f2 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java @@ -39,9 +39,7 @@ import org.slf4j.LoggerFactory; public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { private static final Logger logger = LoggerFactory.getLogger(BaseSchedulerTest.class); - protected DefaultScheduler scheduler; - protected ExecutableManager execMgr; @Before @@ -81,7 +79,6 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { int error = 0; long start = System.currentTimeMillis(); final int errorLimit = 3; - while (error < errorLimit && (System.currentTimeMillis() - start < maxWaitTime)) { try { Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/kylin/blob/a6d1ab02/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java index 63292f0..b1fc544 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java @@ -27,14 +27,17 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.job.BaseTestExecutable; import org.apache.kylin.job.ErrorTestExecutable; import org.apache.kylin.job.FailedTestExecutable; import org.apache.kylin.job.FiveSecondSucceedTestExecutable; import org.apache.kylin.job.NoErrorStatusExecutable; +import org.apache.kylin.job.RunningTestExecutable; import org.apache.kylin.job.SelfStopExecutable; import org.apache.kylin.job.SucceedTestExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.junit.Assert; import org.junit.Ignore; @@ -125,6 +128,23 @@ public class DefaultSchedulerTest extends BaseSchedulerTest { task1.waitForDoWork(); } + @Test + public void testIllegalState() throws Exception { + logger.info("testIllegalState"); + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new SucceedTestExecutable(); + BaseTestExecutable task2 = new RunningTestExecutable(); + job.addTask(task1); + job.addTask(task2); + execMgr.addJob(job); + ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()).updateJobOutput(task2.getId(), + ExecutableState.RUNNING, null, null); + waitForJobFinish(job.getId(), 10000); + Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.RUNNING, execMgr.getOutput(task2.getId()).getState()); + } + @SuppressWarnings("rawtypes") @Ignore("why test JDK feature?") @Test