This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit be6f0fd65b89dabf172e07b71fc924fe47e41289
Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com>
AuthorDate: Wed Dec 7 10:45:30 2022 +0800

    KYLIN-5434 Fix restart the build job, and the stage status displays 
abnormally
---
 .../kylin/job/execution/ExecutableContext.java     | 16 +++++
 .../kylin/job/execution/NExecutableManager.java    | 15 ++++
 .../kylin/job/execution/ExecutableContextTest.java | 14 ++++
 .../org/apache/kylin/rest/service/JobService.java  | 10 ++-
 .../apache/kylin/rest/service/JobServiceTest.java  | 79 ++++++++++++++++++++--
 5 files changed, 129 insertions(+), 5 deletions(-)

diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
index a5799a9df9..0a7e76f518 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
@@ -20,8 +20,10 @@ package org.apache.kylin.job.execution;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
+import io.kyligence.kap.guava20.shaded.common.collect.Sets;
 import org.apache.kylin.common.KylinConfig;
 
 import io.kyligence.kap.guava20.shaded.common.collect.Maps;
@@ -46,12 +48,14 @@ public class ExecutableContext {
 
     private final ConcurrentMap<String, Thread> runningJobThreads;
     private final ConcurrentMap<String, Executable> runningJobs;
+    private final Set<String> frozenJobs;
     private final ConcurrentMap<String, Long> runningJobInfos;
     private final KylinConfig kylinConfig;
 
     public ExecutableContext(ConcurrentMap<String, Executable> runningJobs, 
ConcurrentMap<String, Long> runningJobInfos,
             KylinConfig kylinConfig, long epochId) {
         this.runningJobThreads = Maps.newConcurrentMap();
+        this.frozenJobs = Sets.newConcurrentHashSet();
         this.runningJobs = runningJobs;
         this.runningJobInfos = runningJobInfos;
         this.kylinConfig = kylinConfig;
@@ -82,6 +86,18 @@ public class ExecutableContext {
         return Collections.unmodifiableMap(runningJobs);
     }
 
+    public void addFrozenJob(String jobId) {
+        frozenJobs.add(jobId);
+    }
+
+    public boolean isFrozenJob(String jobId) {
+        return frozenJobs.contains(jobId);
+    }
+
+    public void removeFrozenJob(String jobId) {
+        frozenJobs.remove(jobId);
+    }
+
     public Map<String, Long> getRunningJobInfos() {
         return Collections.unmodifiableMap(runningJobInfos);
     }
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
index 70b4fc1b84..5d62003967 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
@@ -215,6 +215,21 @@ public class NExecutableManager {
         }
     }
 
+    public void addFrozenJob(String jobId) {
+        val scheduler = NDefaultScheduler.getInstance(project);
+        scheduler.getContext().addFrozenJob(jobId);
+    }
+
+    public void removeFrozenJob(String jobId) {
+        val scheduler = NDefaultScheduler.getInstance(project);
+        scheduler.getContext().removeFrozenJob(jobId);
+    }
+
+    public boolean isFrozenJob(String jobId) {
+        val scheduler = NDefaultScheduler.getInstance(project);
+        return scheduler.getContext().isFrozenJob(jobId);
+    }
+
     private void addJobOutput(ExecutablePO executable) {
         ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
         executable.setOutput(executableOutputPO);
diff --git 
a/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java
 
b/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java
index faac5e0dfb..4de81e5810 100644
--- 
a/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java
+++ 
b/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.kylin.job.execution;
 
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.junit.annotation.MetadataInfo;
@@ -43,4 +45,16 @@ class ExecutableContextTest {
         context.removeRunningJob(job);
         assertNull(context.getRunningJobThread(job));
     }
+
+    @Test
+    void testGetFrozenJob() {
+        String jobId = 
"f6384d3e-d46d-5cea-b2d9-28510a2191f3-50b0f62d-e9c1-810b-e499-95aa549c701c";
+        val context = new ExecutableContext(Maps.newConcurrentMap(), 
Maps.newConcurrentMap(),
+                KylinConfig.getInstanceFromEnv(), 0);
+        context.addFrozenJob(jobId);
+        assertTrue(context.isFrozenJob(jobId));
+
+        context.removeFrozenJob(jobId);
+        assertFalse(context.isFrozenJob(jobId));
+    }
 }
\ No newline at end of file
diff --git 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
index 494ac3b50e..19795467f0 100644
--- 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -508,8 +508,11 @@ public class JobService extends BasicService implements 
JobSupporter, ISmartAppl
         case RESTART:
             SecondStorageUtil.checkJobRestart(project, jobId);
             executableManager.updateJobError(jobId, null, null, null, null);
+            executableManager.addFrozenJob(jobId);
             executableManager.restartJob(jobId);
-            UnitOfWork.get().doAfterUnit(afterUnitTask);
+            UnitOfWorkContext unitOfWorkContext = UnitOfWork.get();
+            unitOfWorkContext.doAfterUnit(afterUnitTask);
+            unitOfWorkContext.doAfterUnit(() -> 
executableManager.removeFrozenJob(jobId));
             break;
         case DISCARD:
             discardJob(project, jobId);
@@ -989,6 +992,11 @@ public class JobService extends BasicService implements 
JobSupporter, ISmartAppl
             Map<String, String> updateInfo, String errMsg) {
         final ExecutableState newStatus = convertToExecutableState(status);
         val jobId = NExecutableManager.extractJobId(taskId);
+        val jobManager = getManager(NExecutableManager.class, project);
+        boolean isFrozenJob = jobManager.isFrozenJob(jobId);
+        if (isFrozenJob) {
+            return;
+        }
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
             val executableManager = getManager(NExecutableManager.class, 
project);
             executableManager.updateStageStatus(taskId, segmentId, newStatus, 
updateInfo, errMsg);
diff --git 
a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
 
b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index 0824ecb996..0de349d774 100644
--- 
a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ 
b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -80,6 +80,7 @@ import org.apache.kylin.job.dao.ExecutableOutputPO;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.dao.NExecutableDao;
 import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.BaseTestExecutable;
 import org.apache.kylin.job.execution.ChainedExecutable;
@@ -95,6 +96,7 @@ import org.apache.kylin.job.execution.StageBase;
 import org.apache.kylin.job.execution.SucceedChainedTestExecutable;
 import org.apache.kylin.job.execution.SucceedDagTestExecutable;
 import org.apache.kylin.job.execution.SucceedTestExecutable;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
 import org.apache.kylin.metadata.cube.model.NBatchConstants;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
@@ -110,6 +112,7 @@ import org.apache.kylin.plugin.asyncprofiler.ProfilerStatus;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.request.JobFilter;
 import org.apache.kylin.rest.request.JobUpdateRequest;
+import org.apache.kylin.rest.request.StageRequest;
 import org.apache.kylin.rest.response.DataResult;
 import org.apache.kylin.rest.response.ExecutableResponse;
 import org.apache.kylin.rest.response.ExecutableStepResponse;
@@ -128,6 +131,7 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.springframework.beans.factory.config.BeanDefinition;
 import org.springframework.context.ApplicationEvent;
@@ -392,16 +396,16 @@ public class JobServiceTest extends 
NLocalFileMetadataTestCase {
         Mockito.when(executableDao.getJobs(Mockito.anyLong(), 
Mockito.anyLong())).thenReturn(mockJobs);
         {
             List<String> jobNames = Lists.newArrayList();
-            JobFilter jobFilter = new JobFilter(Lists.newArrayList(), 
jobNames, 0, "", "", "default", "duration",
+            JobFilter jobFilter = new JobFilter(Lists.newArrayList(), 
jobNames, 0, "", "", "default", "total_duration",
                     true);
             List<ExecutableResponse> jobs = jobService.listJobs(jobFilter);
 
-            val durationArrays = 
jobs.stream().map(ExecutableResponse::getTotalDuration)
+            val totalDurationArrays = 
jobs.stream().map(ExecutableResponse::getTotalDuration)
                     .collect(Collectors.toList());
-            List<Long> copyDurationList = new ArrayList<>(durationArrays);
+            List<Long> copyDurationList = new ArrayList<>(totalDurationArrays);
             copyDurationList.sort(Collections.reverseOrder());
             Assert.assertEquals(3, copyDurationList.size());
-            Assert.assertEquals(durationArrays, copyDurationList);
+            Assert.assertEquals(totalDurationArrays, copyDurationList);
         }
 
         for (int i = 0; i < 3; i++) {
@@ -1439,6 +1443,29 @@ public class JobServiceTest extends 
NLocalFileMetadataTestCase {
         Assert.assertEquals(executableResponse, 
jobService.manageJob("default", executableResponse, "RESTART"));
     }
 
+    @Test
+    public void testRestartJob_AddAndRemoveFrozenJob() {
+        NExecutableManager manager = 
NExecutableManager.getInstance(getTestConfig(), project);
+
+        val job = new DefaultExecutable();
+        job.setProject(project);
+        manager.addJob(job);
+
+        NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project);
+        scheduler.init(new JobEngineConfig(getTestConfig()));
+
+        try (MockedStatic<NDefaultScheduler> mockScheduler = 
Mockito.mockStatic(NDefaultScheduler.class)) {
+            mockScheduler.when(() -> 
NDefaultScheduler.getInstance(project)).thenReturn(scheduler);
+            UnitOfWork.doInTransactionWithRetry(() -> {
+                jobService.updateJobStatus(job.getId(), project, "RESTART");
+                Assert.assertTrue(manager.isFrozenJob(job.getId()));
+                return null;
+            }, project);
+
+            Assert.assertFalse(manager.isFrozenJob(job.getId()));
+        }
+    }
+
     @Test
     public void testCheckJobStatus() {
         jobService.checkJobStatus(Lists.newArrayList("RUNNING"));
@@ -1963,4 +1990,48 @@ public class JobServiceTest extends 
NLocalFileMetadataTestCase {
         jobService.onApplicationEvent(applicationEvent);
     }
 
+    @Test
+    public void testUpdateStageStatusFrozenJob() {
+        final String project = "default";
+        final String jobStatus = "ERROR";
+        final String jobId = 
"f6384d3e-d46d-5cea-b2d9-28510a2191f3-50b0f62d-e9c1-810b-e499-95aa549c701c";
+
+        StageRequest request = new StageRequest();
+        request.setProject(project);
+        request.setSegmentId("b");
+        request.setStatus(jobStatus);
+        request.setTaskId(jobId + "_00_00");
+
+        SucceedChainedTestExecutable job = new SucceedChainedTestExecutable();
+        job.setId(jobId);
+        NSparkExecutable sparkExecutable = new NSparkExecutable();
+        job.addTask(sparkExecutable);
+        NStageForBuild buildStage = new NStageForBuild();
+        sparkExecutable.addStage(buildStage);
+        sparkExecutable.setStageMap();
+
+        NExecutableManager executableManager = 
NExecutableManager.getInstance(getTestConfig(), project);
+        executableManager.addJob(job);
+
+        NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project);
+        scheduler.init(new JobEngineConfig(getTestConfig()));
+
+        try (MockedStatic<NDefaultScheduler> nDefaultSchedulerMockedStatic = 
Mockito
+                .mockStatic(NDefaultScheduler.class)) {
+            nDefaultSchedulerMockedStatic.when(() -> 
NDefaultScheduler.getInstance(project)).thenReturn(scheduler);
+            Mockito.when(jobService.getManager(NExecutableManager.class, 
"default")).thenReturn(executableManager);
+
+            executableManager.addFrozenJob(job.getId());
+            jobService.updateStageStatus(request.getProject(), 
request.getTaskId(), request.getSegmentId(),
+                    request.getStatus(), request.getUpdateInfo(), 
request.getErrMsg());
+            executableManager.removeFrozenJob(job.getId());
+            
assertNotEquals(executableManager.getAllJobs().get(0).getTasks().get(0).getStagesMap().get(jobId
 + "_00")
+                    .get(0).getOutput().getStatus(), jobStatus);
+
+            jobService.updateStageStatus(request.getProject(), 
request.getTaskId(), request.getSegmentId(),
+                    request.getStatus(), request.getUpdateInfo(), 
request.getErrMsg());
+            
assertEquals(executableManager.getAllJobs().get(0).getTasks().get(0).getStagesMap().get(jobId
 + "_00")
+                    .get(0).getOutput().getStatus(), jobStatus);
+        }
+    }
 }

Reply via email to