This is an automated email from the ASF dual-hosted git repository. nju_yaho pushed a commit to tag ebay-3.1.0-release-20200701 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 23bc5ac8949e45a088a1891f51e22476801f1e44 Author: Wang gang <gwa...@ebay.com> AuthorDate: Wed Jun 24 11:18:21 2020 +0800 KYLIN-3140 Auto merge jobs should not be counted in kylin.cube.max-building-segments --- .../java/org/apache/kylin/cube/CubeManager.java | 9 +++ .../kylin/rest/controller/CubeController.java | 13 ----- .../org/apache/kylin/rest/service/CubeService.java | 3 + .../org/apache/kylin/rest/service/JobService.java | 39 +++++++++++++ .../apache/kylin/rest/service/JobServiceTest.java | 66 ++++++++++++++++++++-- .../org/apache/kylin/tool/job/CubeBuildingCLI.java | 13 +++-- 6 files changed, 119 insertions(+), 24 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index a61a409..b255dd8 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -349,6 +349,15 @@ public class CubeManager implements IRealizationProvider { } } + public CubeInstance updateCubeSegments(CubeInstance cube, CubeSegment... segsToUpdate) throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + cube = cube.latestCopyForWrite(); // get a latest copy + CubeUpdate update = new CubeUpdate(cube); + update.setToUpdateSegs(segsToUpdate); + return updateCube(update); + } + } + public CubeInstance dropOptmizingSegments(CubeInstance cube, CubeSegment... segsToDrop) throws IOException { try (AutoLock lock = cubeMapLock.lockForWrite()) { cube = cube.latestCopyForWrite(); // get a latest copy diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index baedcd9..52b7ae5 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -70,7 +70,6 @@ import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.exception.ForbiddenException; import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.exception.NotFoundException; -import org.apache.kylin.rest.exception.TooManyRequestException; import org.apache.kylin.rest.msg.Message; import org.apache.kylin.rest.msg.MsgPicker; import org.apache.kylin.rest.request.CubeRequest; @@ -419,7 +418,6 @@ public class CubeController extends BasicController { String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); CubeInstance cube = jobService.getCubeManager().getCube(cubeName); - checkBuildingSegment(cube); return jobService.submitJob(cube, tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd, CubeBuildTypeEnum.valueOf(buildType), force, submitter, priorityOffset); } catch (Throwable e) { @@ -1244,17 +1242,6 @@ public class CubeController extends BasicController { } } - private void checkBuildingSegment(CubeInstance cube) { - checkBuildingSegment(cube, cube.getConfig().getMaxBuildingSegments()); - } - - private void checkBuildingSegment(CubeInstance cube, int maxBuildingSeg) { - if (cube.getBuildingSegments().size() >= maxBuildingSeg) { - throw new TooManyRequestException( - "There is already " + cube.getBuildingSegments().size() + " building segment; "); - } - } - public void setCubeService(CubeService cubeService) { this.cubeService = cubeService; } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index 4793510..1a4b468 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -782,6 +782,9 @@ public class CubeService extends BasicService implements InitializingBean { logger.info("Will submit merge job on " + newSeg); DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(newSeg, "SYSTEM"); getExecutableManager().addJob(job); + + newSeg.setLastBuildJobID(job.getId()); + getCubeManager().updateCubeSegments(cube, newSeg); return job.getId(); } else { logger.info("Not ready for merge on cube " + cubeName); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index f9880bd..9ee5fb4 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -80,6 +80,7 @@ import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.rest.exception.BadRequestException; +import org.apache.kylin.rest.exception.TooManyRequestException; import org.apache.kylin.rest.msg.Message; import org.apache.kylin.rest.msg.MsgPicker; import org.apache.kylin.rest.response.ResponseCode; @@ -252,6 +253,7 @@ public class JobService extends BasicService implements InitializingBean { checkCubeDescSignature(cube); checkAllowBuilding(cube); + checkBuildingSegment(cube); if (buildType == CubeBuildTypeEnum.BUILD || buildType == CubeBuildTypeEnum.REFRESH) { checkAllowParallelBuilding(cube); @@ -281,6 +283,9 @@ public class JobService extends BasicService implements InitializingBean { getExecutableManager().addJob(job); + //To add job id to the segment info for future system job check. + newSeg.setLastBuildJobID(job.getId()); + getCubeManager().updateCubeSegments(cube, newSeg); } catch (Exception e) { if (newSeg != null) { logger.error("Job submission might failed for NEW segment {}, will clean the NEW segment from cube", @@ -354,6 +359,8 @@ public class JobService extends BasicService implements InitializingBean { optimizeJobList.add(optimizeJob); optimizeJobInstances.add(getSingleJobInstance(optimizeJob)); + + optimizeSegment.setLastBuildJobID(optimizeJob.getId()); } /** Add checkpoint job for batch jobs */ @@ -362,6 +369,8 @@ public class JobService extends BasicService implements InitializingBean { getExecutableManager().addJob(checkpointJob); + getCubeManager().updateCubeSegments(cube, optimizeSegments); + try { sendTriggerOptimizeMail(cube, checkpointJob.getName(), checkpointJob.getProjectName(), submitter); } catch (Exception e) { @@ -457,6 +466,9 @@ public class JobService extends BasicService implements InitializingBean { getExecutableManager().addJob(optimizeJob); + optimizeSegment.setLastBuildJobID(optimizeJob.getId()); + getCubeManager().updateCubeSegments(cubeInstance, optimizeSegment); + JobInstance optimizeJobInstance = getSingleJobInstance(optimizeJob); /** Update the checkpoint job */ @@ -531,6 +543,33 @@ public class JobService extends BasicService implements InitializingBean { } } + private void checkBuildingSegment(CubeInstance cube) { + checkBuildingSegment(cube, cube.getConfig().getMaxBuildingSegments()); + } + + private void checkBuildingSegment(CubeInstance cube, int maxBuildingSeg) { + if (cube.getBuildingSegments().size() < maxBuildingSeg) { + return; + } + + int buildingSegments = 0; + for (CubeSegment segment : cube.getBuildingSegments()) { + buildingSegments++; + + String jobId = segment.getLastBuildJobID(); + if (!StringUtil.isEmpty(jobId)) { + AbstractExecutable job = getExecutableManager().getJob(jobId); + if (job != null && "SYSTEM".equalsIgnoreCase(job.getSubmitter())) { + // exclude the system jobs + buildingSegments--; + } + } + } + if (buildingSegments >= maxBuildingSeg) { + throw new TooManyRequestException("There is already " + buildingSegments + " building segment; "); + } + } + public JobInstance getJobInstance(String uuid) { AbstractExecutable job = getExecutableManager().getJob(uuid); if (job instanceof CheckpointExecutable) { diff --git a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java index a617b7c..b2eba98 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java @@ -18,6 +18,14 @@ package org.apache.kylin.rest.service; +import java.io.IOException; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; + +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.model.CubeBuildTypeEnum; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.job.constant.JobTimeFilterEnum; import org.apache.kylin.job.exception.ExecuteException; @@ -28,18 +36,16 @@ import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.job.execution.Output; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.query.QueryConnection; +import org.apache.kylin.rest.exception.TooManyRequestException; import org.junit.Assert; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import java.io.IOException; -import java.sql.SQLException; -import java.util.Collections; -import java.util.List; - /** * @author xduo */ @@ -60,7 +66,8 @@ public class JobServiceTest extends ServiceTestBase { Assert.assertNotNull(jobService.getDataModelManager()); Assert.assertNotNull(QueryConnection.getConnection(ProjectInstance.DEFAULT_PROJECT_NAME)); Assert.assertNull(jobService.getJobInstance("job_not_exist")); - Assert.assertNotNull(jobService.searchJobs(null, null, null, 0, 0, JobTimeFilterEnum.ALL, JobService.JobSearchMode.ALL)); + Assert.assertNotNull( + jobService.searchJobs(null, null, null, 0, 0, JobTimeFilterEnum.ALL, JobService.JobSearchMode.ALL)); } @Test @@ -74,6 +81,53 @@ public class JobServiceTest extends ServiceTestBase { Assert.assertEquals(0, jobs.size()); } + @Test + public void testConcurrencyJobs() throws IOException { + int maxBuildingSegments = 5; + + System.setProperty("kylin.cube.max-building-segments", String.valueOf(maxBuildingSegments)); + System.setProperty("kylin.cube.ignore-signature-inconsistency", "true"); + CubeManager cubeManager = CubeManager.getInstance(jobService.getConfig()); + CubeInstance cube = cubeManager.getCube("ssb"); + + jobService.submitJobInternal(cube, new SegmentRange.TSRange(1L, 136468L), null, null, null, + CubeBuildTypeEnum.BUILD, false, "test", 1); + cube = cubeManager.getCube(cube.getName()); + cubeManager.updateCubeSegStatus(cube.getSegments().get(0), SegmentStatusEnum.READY); + + int i = 1; + for (; i <= maxBuildingSegments - 1; i++) { + cube = cubeManager.getCube(cube.getName()); + jobService.submitJobInternal(cube, new SegmentRange.TSRange(136468L * i, 136468L * (i + 1)), null, null, + null, CubeBuildTypeEnum.BUILD, false, "test", 1); + } + + jobService.submitJobInternal(cube, new SegmentRange.TSRange(136468L * i, 136468L * (i + 1)), null, null, null, + CubeBuildTypeEnum.BUILD, false, "SYSTEM", 1); + i++; + + Exception ex = null; + try { + cube = cubeManager.getCube(cube.getName()); + jobService.submitJobInternal(cube, new SegmentRange.TSRange(136468L * i, 136468L * (i + 1)), null, null, + null, CubeBuildTypeEnum.BUILD, false, "test", 1); + i++; + } catch (TooManyRequestException e) { + ex = e; + } + Assert.assertNull(ex); + + try { + cube = cubeManager.getCube(cube.getName()); + jobService.submitJobInternal(cube, new SegmentRange.TSRange(136468L * i, 136468L * (i + 1)), null, null, + null, CubeBuildTypeEnum.BUILD, false, "test", 1); + i++; + } catch (TooManyRequestException e) { + ex = e; + } + Assert.assertNotNull(ex); + } + public static class TestJob extends CubingJob { public TestJob() { diff --git a/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java index c651728..41a2398 100644 --- a/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java @@ -96,7 +96,7 @@ public class CubeBuildingCLI extends AbstractApplication { Preconditions.checkArgument(cube != null, "Cube named " + cubeName + " does not exist!!!"); CubeBuildTypeEnum buildTypeEnum = CubeBuildTypeEnum.valueOf(buildType); Preconditions.checkArgument(buildTypeEnum != null, "Build type named " + buildType + " does not exist!!!"); - submitJob(cube, new TSRange(startDate, endDate), buildTypeEnum, false, "SYSTEM"); + submitJob(cube, new TSRange(startDate, endDate), buildTypeEnum, false, "SYSTEM2"); } private void submitJob(CubeInstance cube, TSRange tsRange, CubeBuildTypeEnum buildType, @@ -105,19 +105,22 @@ public class CubeBuildingCLI extends AbstractApplication { DefaultChainedExecutable job; + CubeSegment newSeg; if (buildType == CubeBuildTypeEnum.BUILD) { - CubeSegment newSeg = cubeManager.appendSegment(cube, tsRange); + newSeg = cubeManager.appendSegment(cube, tsRange); job = EngineFactory.createBatchCubingJob(newSeg, submitter, null); } else if (buildType == CubeBuildTypeEnum.MERGE) { - CubeSegment newSeg = cubeManager.mergeSegments(cube, tsRange, null, forceMergeEmptySeg); + newSeg = cubeManager.mergeSegments(cube, tsRange, null, forceMergeEmptySeg); job = EngineFactory.createBatchMergeJob(newSeg, submitter); } else if (buildType == CubeBuildTypeEnum.REFRESH) { - CubeSegment refreshSeg = cubeManager.refreshSegment(cube, tsRange, null); - job = EngineFactory.createBatchCubingJob(refreshSeg, submitter, null); + newSeg = cubeManager.refreshSegment(cube, tsRange, null); + job = EngineFactory.createBatchCubingJob(newSeg, submitter, null); } else { throw new JobException("invalid build type:" + buildType); } + executableManager.addJob(job); + cubeManager.updateCubeSegments(cube, newSeg); } private void checkCubeDescSignature(CubeInstance cube) {