This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ea4c009b6563f863b6a5184a6b83c25953c98391 Author: huangsheng <huangshen...@163.com> AuthorDate: Thu Dec 8 10:27:34 2022 +0800 KYLIN-5437 disable stage transfer states from DISCARDED to others KYLIN-5437 disable stage job transfer states from DISCARDED to any other states --- .../kylin/job/execution/AbstractExecutable.java | 31 +++++++++++++++++----- .../kylin/job/execution/NExecutableManager.java | 4 ++- .../org/apache/kylin/rest/service/JobService.java | 2 +- .../org/apache/kylin/rest/service/StageTest.java | 6 +++++ 4 files changed, 35 insertions(+), 8 deletions(-) diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index db308fc8f5..2baf95144c 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -779,7 +779,8 @@ public abstract class AbstractExecutable implements Executable { val stagesMap = task.getStagesMap(); if (stagesMap.size() == 1) { for (Map.Entry<String, List<StageBase>> entry : stagesMap.entrySet()) { - taskDuration = entry.getValue().stream().map(stage -> getDuration(stage.getOutput(entry.getKey()))) // + taskDuration = entry.getValue().stream() + .map(stage -> getStageDuration(stage.getOutput(entry.getKey()), getParent())) // .mapToLong(Long::valueOf) // .sum(); } @@ -791,6 +792,28 @@ public abstract class AbstractExecutable implements Executable { return getDuration(getOutput()); } + public static long computeDuration(Output output) { + if (output.getStartTime() == 0) { + return 0; + } + return output.getEndTime() == 0 ? System.currentTimeMillis() - output.getStartTime() + : output.getEndTime() - output.getStartTime(); + } + + // just used for the stage job + public static long getStageDuration(Output output, AbstractExecutable parent) { + if (output.getDuration() != 0) { + var duration = output.getDuration(); + // If the parent job is not running, the duration of the stage is no longer counted no matter what state the stage is + if (parent != null && parent.getStatus() == ExecutableState.RUNNING + && ExecutableState.RUNNING == output.getState()) { + duration = duration + System.currentTimeMillis() - output.getLastRunningStartTime(); + } + return duration; + } + return computeDuration(output); + } + public static long getDuration(Output output) { if (output.getDuration() != 0) { var duration = output.getDuration(); @@ -799,11 +822,7 @@ public abstract class AbstractExecutable implements Executable { } return duration; } - if (output.getStartTime() == 0) { - return 0; - } - return output.getEndTime() == 0 ? System.currentTimeMillis() - output.getStartTime() - : output.getEndTime() - output.getStartTime(); + return computeDuration(output); } public long getWaitTime() { diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java index 512d891320..66582f5bb8 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java @@ -1237,8 +1237,10 @@ public class NExecutableManager { "[UNEXPECTED_THINGS_HAPPENED] wrong job state transfer! There is no valid state transfer from: {} to: {}, job id: {}", oldStatus, newStatus, taskOrJobId); } + // DISCARDED must not be transferred to any others status if ((oldStatus == ExecutableState.PAUSED && newStatus == ExecutableState.ERROR) - || (oldStatus == ExecutableState.SKIP && newStatus == ExecutableState.SUCCEED)) { + || (oldStatus == ExecutableState.SKIP && newStatus == ExecutableState.SUCCEED) + || oldStatus == ExecutableState.DISCARDED) { return false; } if (isRestart || (oldStatus != ExecutableState.SUCCEED && oldStatus != ExecutableState.SKIP)) { diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java index 19795467f0..c9305850a9 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -902,7 +902,7 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl result.setExecEndTime(AbstractExecutable.getEndTime(stageOutput)); result.setCreateTime(AbstractExecutable.getCreateTime(stageOutput)); - result.setDuration(AbstractExecutable.getDuration(stageOutput)); + result.setDuration(AbstractExecutable.getStageDuration(stageOutput, task.getParent())); val indexCount = Optional.ofNullable(task.getParam(NBatchConstants.P_INDEX_COUNT)).orElse("0"); result.setIndexCount(Long.parseLong(indexCount)); diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java index bf63baa1e2..b69c00eb9b 100644 --- a/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java +++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java @@ -385,6 +385,12 @@ public class StageTest extends NLocalFileMetadataTestCase { Assert.assertFalse(flag); Assert.assertEquals("SKIP", jobOutput.getStatus()); + jobOutput.setStatus(ExecutableState.DISCARDED.toString()); + newStatus = ExecutableState.RUNNING; + flag = manager.setStageOutput(jobOutput, taskOrJobId, newStatus, updateInfo, failedMsg, isRestart); + Assert.assertFalse(flag); + Assert.assertEquals("DISCARDED", jobOutput.getStatus()); + jobOutput.setStatus(ExecutableState.READY.toString()); newStatus = ExecutableState.SUCCEED; flag = manager.setStageOutput(jobOutput, taskOrJobId, newStatus, updateInfo, failedMsg, isRestart);