This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ea4c009b6563f863b6a5184a6b83c25953c98391
Author: huangsheng <huangshen...@163.com>
AuthorDate: Thu Dec 8 10:27:34 2022 +0800

    KYLIN-5437 disable stage transfer states from DISCARDED to others
    
    KYLIN-5437 disable stage job transfer states from DISCARDED to any other 
states
---
 .../kylin/job/execution/AbstractExecutable.java    | 31 +++++++++++++++++-----
 .../kylin/job/execution/NExecutableManager.java    |  4 ++-
 .../org/apache/kylin/rest/service/JobService.java  |  2 +-
 .../org/apache/kylin/rest/service/StageTest.java   |  6 +++++
 4 files changed, 35 insertions(+), 8 deletions(-)

diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index db308fc8f5..2baf95144c 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -779,7 +779,8 @@ public abstract class AbstractExecutable implements 
Executable {
         val stagesMap = task.getStagesMap();
         if (stagesMap.size() == 1) {
             for (Map.Entry<String, List<StageBase>> entry : 
stagesMap.entrySet()) {
-                taskDuration = entry.getValue().stream().map(stage -> 
getDuration(stage.getOutput(entry.getKey()))) //
+                taskDuration = entry.getValue().stream()
+                        .map(stage -> 
getStageDuration(stage.getOutput(entry.getKey()), getParent())) //
                         .mapToLong(Long::valueOf) //
                         .sum();
             }
@@ -791,6 +792,28 @@ public abstract class AbstractExecutable implements 
Executable {
         return getDuration(getOutput());
     }
 
+    public static long computeDuration(Output output) {
+        if (output.getStartTime() == 0) {
+            return 0;
+        }
+        return output.getEndTime() == 0 ? System.currentTimeMillis() - 
output.getStartTime()
+                : output.getEndTime() - output.getStartTime();
+    }
+
+    // just used for the stage job
+    public static long getStageDuration(Output output, AbstractExecutable 
parent) {
+        if (output.getDuration() != 0) {
+            var duration = output.getDuration();
+            // If the parent job is not running, the duration of the stage is 
no longer counted no matter what state the stage is
+            if (parent != null && parent.getStatus() == ExecutableState.RUNNING
+                    && ExecutableState.RUNNING == output.getState()) {
+                duration = duration + System.currentTimeMillis() - 
output.getLastRunningStartTime();
+            }
+            return duration;
+        }
+        return computeDuration(output);
+    }
+
     public static long getDuration(Output output) {
         if (output.getDuration() != 0) {
             var duration = output.getDuration();
@@ -799,11 +822,7 @@ public abstract class AbstractExecutable implements 
Executable {
             }
             return duration;
         }
-        if (output.getStartTime() == 0) {
-            return 0;
-        }
-        return output.getEndTime() == 0 ? System.currentTimeMillis() - 
output.getStartTime()
-                : output.getEndTime() - output.getStartTime();
+        return computeDuration(output);
     }
 
     public long getWaitTime() {
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
index 512d891320..66582f5bb8 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
@@ -1237,8 +1237,10 @@ public class NExecutableManager {
                         "[UNEXPECTED_THINGS_HAPPENED] wrong job state 
transfer! There is no valid state transfer from: {} to: {}, job id: {}",
                         oldStatus, newStatus, taskOrJobId);
             }
+            // DISCARDED must not be transferred to any others status
             if ((oldStatus == ExecutableState.PAUSED && newStatus == 
ExecutableState.ERROR)
-                    || (oldStatus == ExecutableState.SKIP && newStatus == 
ExecutableState.SUCCEED)) {
+                    || (oldStatus == ExecutableState.SKIP && newStatus == 
ExecutableState.SUCCEED)
+                    || oldStatus == ExecutableState.DISCARDED) {
                 return false;
             }
             if (isRestart || (oldStatus != ExecutableState.SUCCEED && 
oldStatus != ExecutableState.SKIP)) {
diff --git 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
index 19795467f0..c9305850a9 100644
--- 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -902,7 +902,7 @@ public class JobService extends BasicService implements 
JobSupporter, ISmartAppl
         result.setExecEndTime(AbstractExecutable.getEndTime(stageOutput));
         result.setCreateTime(AbstractExecutable.getCreateTime(stageOutput));
 
-        result.setDuration(AbstractExecutable.getDuration(stageOutput));
+        result.setDuration(AbstractExecutable.getStageDuration(stageOutput, 
task.getParent()));
 
         val indexCount = 
Optional.ofNullable(task.getParam(NBatchConstants.P_INDEX_COUNT)).orElse("0");
         result.setIndexCount(Long.parseLong(indexCount));
diff --git 
a/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java 
b/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
index bf63baa1e2..b69c00eb9b 100644
--- a/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
+++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
@@ -385,6 +385,12 @@ public class StageTest extends NLocalFileMetadataTestCase {
         Assert.assertFalse(flag);
         Assert.assertEquals("SKIP", jobOutput.getStatus());
 
+        jobOutput.setStatus(ExecutableState.DISCARDED.toString());
+        newStatus = ExecutableState.RUNNING;
+        flag = manager.setStageOutput(jobOutput, taskOrJobId, newStatus, 
updateInfo, failedMsg, isRestart);
+        Assert.assertFalse(flag);
+        Assert.assertEquals("DISCARDED", jobOutput.getStatus());
+
         jobOutput.setStatus(ExecutableState.READY.toString());
         newStatus = ExecutableState.SUCCEED;
         flag = manager.setStageOutput(jobOutput, taskOrJobId, newStatus, 
updateInfo, failedMsg, isRestart);

Reply via email to