This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit be6f0fd65b89dabf172e07b71fc924fe47e41289 Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Wed Dec 7 10:45:30 2022 +0800 KYLIN-5434 Fix restart the build job, and the stage status displays abnormally --- .../kylin/job/execution/ExecutableContext.java | 16 +++++ .../kylin/job/execution/NExecutableManager.java | 15 ++++ .../kylin/job/execution/ExecutableContextTest.java | 14 ++++ .../org/apache/kylin/rest/service/JobService.java | 10 ++- .../apache/kylin/rest/service/JobServiceTest.java | 79 ++++++++++++++++++++-- 5 files changed, 129 insertions(+), 5 deletions(-) diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java index a5799a9df9..0a7e76f518 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java @@ -20,8 +20,10 @@ package org.apache.kylin.job.execution; import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; +import io.kyligence.kap.guava20.shaded.common.collect.Sets; import org.apache.kylin.common.KylinConfig; import io.kyligence.kap.guava20.shaded.common.collect.Maps; @@ -46,12 +48,14 @@ public class ExecutableContext { private final ConcurrentMap<String, Thread> runningJobThreads; private final ConcurrentMap<String, Executable> runningJobs; + private final Set<String> frozenJobs; private final ConcurrentMap<String, Long> runningJobInfos; private final KylinConfig kylinConfig; public ExecutableContext(ConcurrentMap<String, Executable> runningJobs, ConcurrentMap<String, Long> runningJobInfos, KylinConfig kylinConfig, long epochId) { this.runningJobThreads = Maps.newConcurrentMap(); + this.frozenJobs = Sets.newConcurrentHashSet(); this.runningJobs = runningJobs; this.runningJobInfos = runningJobInfos; this.kylinConfig = kylinConfig; @@ -82,6 +86,18 @@ public class ExecutableContext { return Collections.unmodifiableMap(runningJobs); } + public void addFrozenJob(String jobId) { + frozenJobs.add(jobId); + } + + public boolean isFrozenJob(String jobId) { + return frozenJobs.contains(jobId); + } + + public void removeFrozenJob(String jobId) { + frozenJobs.remove(jobId); + } + public Map<String, Long> getRunningJobInfos() { return Collections.unmodifiableMap(runningJobInfos); } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java index 70b4fc1b84..5d62003967 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java @@ -215,6 +215,21 @@ public class NExecutableManager { } } + public void addFrozenJob(String jobId) { + val scheduler = NDefaultScheduler.getInstance(project); + scheduler.getContext().addFrozenJob(jobId); + } + + public void removeFrozenJob(String jobId) { + val scheduler = NDefaultScheduler.getInstance(project); + scheduler.getContext().removeFrozenJob(jobId); + } + + public boolean isFrozenJob(String jobId) { + val scheduler = NDefaultScheduler.getInstance(project); + return scheduler.getContext().isFrozenJob(jobId); + } + private void addJobOutput(ExecutablePO executable) { ExecutableOutputPO executableOutputPO = new ExecutableOutputPO(); executable.setOutput(executableOutputPO); diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java index faac5e0dfb..4de81e5810 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java @@ -18,8 +18,10 @@ package org.apache.kylin.job.execution; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.junit.annotation.MetadataInfo; @@ -43,4 +45,16 @@ class ExecutableContextTest { context.removeRunningJob(job); assertNull(context.getRunningJobThread(job)); } + + @Test + void testGetFrozenJob() { + String jobId = "f6384d3e-d46d-5cea-b2d9-28510a2191f3-50b0f62d-e9c1-810b-e499-95aa549c701c"; + val context = new ExecutableContext(Maps.newConcurrentMap(), Maps.newConcurrentMap(), + KylinConfig.getInstanceFromEnv(), 0); + context.addFrozenJob(jobId); + assertTrue(context.isFrozenJob(jobId)); + + context.removeFrozenJob(jobId); + assertFalse(context.isFrozenJob(jobId)); + } } \ No newline at end of file diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java index 494ac3b50e..19795467f0 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -508,8 +508,11 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl case RESTART: SecondStorageUtil.checkJobRestart(project, jobId); executableManager.updateJobError(jobId, null, null, null, null); + executableManager.addFrozenJob(jobId); executableManager.restartJob(jobId); - UnitOfWork.get().doAfterUnit(afterUnitTask); + UnitOfWorkContext unitOfWorkContext = UnitOfWork.get(); + unitOfWorkContext.doAfterUnit(afterUnitTask); + unitOfWorkContext.doAfterUnit(() -> executableManager.removeFrozenJob(jobId)); break; case DISCARD: discardJob(project, jobId); @@ -989,6 +992,11 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl Map<String, String> updateInfo, String errMsg) { final ExecutableState newStatus = convertToExecutableState(status); val jobId = NExecutableManager.extractJobId(taskId); + val jobManager = getManager(NExecutableManager.class, project); + boolean isFrozenJob = jobManager.isFrozenJob(jobId); + if (isFrozenJob) { + return; + } EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { val executableManager = getManager(NExecutableManager.class, project); executableManager.updateStageStatus(taskId, segmentId, newStatus, updateInfo, errMsg); diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java index 0824ecb996..0de349d774 100644 --- a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java +++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java @@ -80,6 +80,7 @@ import org.apache.kylin.job.dao.ExecutableOutputPO; import org.apache.kylin.job.dao.ExecutablePO; import org.apache.kylin.job.dao.NExecutableDao; import org.apache.kylin.job.exception.PersistentException; +import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.BaseTestExecutable; import org.apache.kylin.job.execution.ChainedExecutable; @@ -95,6 +96,7 @@ import org.apache.kylin.job.execution.StageBase; import org.apache.kylin.job.execution.SucceedChainedTestExecutable; import org.apache.kylin.job.execution.SucceedDagTestExecutable; import org.apache.kylin.job.execution.SucceedTestExecutable; +import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; import org.apache.kylin.metadata.cube.model.NBatchConstants; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.cube.model.NIndexPlanManager; @@ -110,6 +112,7 @@ import org.apache.kylin.plugin.asyncprofiler.ProfilerStatus; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.request.JobFilter; import org.apache.kylin.rest.request.JobUpdateRequest; +import org.apache.kylin.rest.request.StageRequest; import org.apache.kylin.rest.response.DataResult; import org.apache.kylin.rest.response.ExecutableResponse; import org.apache.kylin.rest.response.ExecutableStepResponse; @@ -128,6 +131,7 @@ import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.context.ApplicationEvent; @@ -392,16 +396,16 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { Mockito.when(executableDao.getJobs(Mockito.anyLong(), Mockito.anyLong())).thenReturn(mockJobs); { List<String> jobNames = Lists.newArrayList(); - JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 0, "", "", "default", "duration", + JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 0, "", "", "default", "total_duration", true); List<ExecutableResponse> jobs = jobService.listJobs(jobFilter); - val durationArrays = jobs.stream().map(ExecutableResponse::getTotalDuration) + val totalDurationArrays = jobs.stream().map(ExecutableResponse::getTotalDuration) .collect(Collectors.toList()); - List<Long> copyDurationList = new ArrayList<>(durationArrays); + List<Long> copyDurationList = new ArrayList<>(totalDurationArrays); copyDurationList.sort(Collections.reverseOrder()); Assert.assertEquals(3, copyDurationList.size()); - Assert.assertEquals(durationArrays, copyDurationList); + Assert.assertEquals(totalDurationArrays, copyDurationList); } for (int i = 0; i < 3; i++) { @@ -1439,6 +1443,29 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { Assert.assertEquals(executableResponse, jobService.manageJob("default", executableResponse, "RESTART")); } + @Test + public void testRestartJob_AddAndRemoveFrozenJob() { + NExecutableManager manager = NExecutableManager.getInstance(getTestConfig(), project); + + val job = new DefaultExecutable(); + job.setProject(project); + manager.addJob(job); + + NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project); + scheduler.init(new JobEngineConfig(getTestConfig())); + + try (MockedStatic<NDefaultScheduler> mockScheduler = Mockito.mockStatic(NDefaultScheduler.class)) { + mockScheduler.when(() -> NDefaultScheduler.getInstance(project)).thenReturn(scheduler); + UnitOfWork.doInTransactionWithRetry(() -> { + jobService.updateJobStatus(job.getId(), project, "RESTART"); + Assert.assertTrue(manager.isFrozenJob(job.getId())); + return null; + }, project); + + Assert.assertFalse(manager.isFrozenJob(job.getId())); + } + } + @Test public void testCheckJobStatus() { jobService.checkJobStatus(Lists.newArrayList("RUNNING")); @@ -1963,4 +1990,48 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { jobService.onApplicationEvent(applicationEvent); } + @Test + public void testUpdateStageStatusFrozenJob() { + final String project = "default"; + final String jobStatus = "ERROR"; + final String jobId = "f6384d3e-d46d-5cea-b2d9-28510a2191f3-50b0f62d-e9c1-810b-e499-95aa549c701c"; + + StageRequest request = new StageRequest(); + request.setProject(project); + request.setSegmentId("b"); + request.setStatus(jobStatus); + request.setTaskId(jobId + "_00_00"); + + SucceedChainedTestExecutable job = new SucceedChainedTestExecutable(); + job.setId(jobId); + NSparkExecutable sparkExecutable = new NSparkExecutable(); + job.addTask(sparkExecutable); + NStageForBuild buildStage = new NStageForBuild(); + sparkExecutable.addStage(buildStage); + sparkExecutable.setStageMap(); + + NExecutableManager executableManager = NExecutableManager.getInstance(getTestConfig(), project); + executableManager.addJob(job); + + NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project); + scheduler.init(new JobEngineConfig(getTestConfig())); + + try (MockedStatic<NDefaultScheduler> nDefaultSchedulerMockedStatic = Mockito + .mockStatic(NDefaultScheduler.class)) { + nDefaultSchedulerMockedStatic.when(() -> NDefaultScheduler.getInstance(project)).thenReturn(scheduler); + Mockito.when(jobService.getManager(NExecutableManager.class, "default")).thenReturn(executableManager); + + executableManager.addFrozenJob(job.getId()); + jobService.updateStageStatus(request.getProject(), request.getTaskId(), request.getSegmentId(), + request.getStatus(), request.getUpdateInfo(), request.getErrMsg()); + executableManager.removeFrozenJob(job.getId()); + assertNotEquals(executableManager.getAllJobs().get(0).getTasks().get(0).getStagesMap().get(jobId + "_00") + .get(0).getOutput().getStatus(), jobStatus); + + jobService.updateStageStatus(request.getProject(), request.getTaskId(), request.getSegmentId(), + request.getStatus(), request.getUpdateInfo(), request.getErrMsg()); + assertEquals(executableManager.getAllJobs().get(0).getTasks().get(0).getStagesMap().get(jobId + "_00") + .get(0).getOutput().getStatus(), jobStatus); + } + } }