This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to tag ebay-3.1.0-release-20200701
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 23bc5ac8949e45a088a1891f51e22476801f1e44
Author: Wang gang <gwa...@ebay.com>
AuthorDate: Wed Jun 24 11:18:21 2020 +0800

    KYLIN-3140 Auto merge jobs should not be counted in 
kylin.cube.max-building-segments
---
 .../java/org/apache/kylin/cube/CubeManager.java    |  9 +++
 .../kylin/rest/controller/CubeController.java      | 13 -----
 .../org/apache/kylin/rest/service/CubeService.java |  3 +
 .../org/apache/kylin/rest/service/JobService.java  | 39 +++++++++++++
 .../apache/kylin/rest/service/JobServiceTest.java  | 66 ++++++++++++++++++++--
 .../org/apache/kylin/tool/job/CubeBuildingCLI.java | 13 +++--
 6 files changed, 119 insertions(+), 24 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index a61a409..b255dd8 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -349,6 +349,15 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
+    public CubeInstance updateCubeSegments(CubeInstance cube, CubeSegment... 
segsToUpdate) throws IOException {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            cube = cube.latestCopyForWrite(); // get a latest copy
+            CubeUpdate update = new CubeUpdate(cube);
+            update.setToUpdateSegs(segsToUpdate);
+            return updateCube(update);
+        }
+    }
+
     public CubeInstance dropOptmizingSegments(CubeInstance cube, 
CubeSegment... segsToDrop) throws IOException {
         try (AutoLock lock = cubeMapLock.lockForWrite()) {
             cube = cube.latestCopyForWrite(); // get a latest copy
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index baedcd9..52b7ae5 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -70,7 +70,6 @@ import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.exception.ForbiddenException;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.exception.NotFoundException;
-import org.apache.kylin.rest.exception.TooManyRequestException;
 import org.apache.kylin.rest.msg.Message;
 import org.apache.kylin.rest.msg.MsgPicker;
 import org.apache.kylin.rest.request.CubeRequest;
@@ -419,7 +418,6 @@ public class CubeController extends BasicController {
             String submitter = 
SecurityContextHolder.getContext().getAuthentication().getName();
             CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
 
-            checkBuildingSegment(cube);
             return jobService.submitJob(cube, tsRange, segRange, 
sourcePartitionOffsetStart, sourcePartitionOffsetEnd,
                     CubeBuildTypeEnum.valueOf(buildType), force, submitter, 
priorityOffset);
         } catch (Throwable e) {
@@ -1244,17 +1242,6 @@ public class CubeController extends BasicController {
         }
     }
 
-    private void checkBuildingSegment(CubeInstance cube) {
-        checkBuildingSegment(cube, cube.getConfig().getMaxBuildingSegments());
-    }
-
-    private void checkBuildingSegment(CubeInstance cube, int maxBuildingSeg) {
-        if (cube.getBuildingSegments().size() >= maxBuildingSeg) {
-            throw new TooManyRequestException(
-                    "There is already " + cube.getBuildingSegments().size() + 
" building segment; ");
-        }
-    }
-
     public void setCubeService(CubeService cubeService) {
         this.cubeService = cubeService;
     }
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 4793510..1a4b468 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -782,6 +782,9 @@ public class CubeService extends BasicService implements 
InitializingBean {
                     logger.info("Will submit merge job on " + newSeg);
                     DefaultChainedExecutable job = 
EngineFactory.createBatchMergeJob(newSeg, "SYSTEM");
                     getExecutableManager().addJob(job);
+
+                    newSeg.setLastBuildJobID(job.getId());
+                    getCubeManager().updateCubeSegments(cube, newSeg);
                     return job.getId();
                 } else {
                     logger.info("Not ready for merge on cube " + cubeName);
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index f9880bd..9ee5fb4 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -80,6 +80,7 @@ import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.rest.exception.BadRequestException;
+import org.apache.kylin.rest.exception.TooManyRequestException;
 import org.apache.kylin.rest.msg.Message;
 import org.apache.kylin.rest.msg.MsgPicker;
 import org.apache.kylin.rest.response.ResponseCode;
@@ -252,6 +253,7 @@ public class JobService extends BasicService implements 
InitializingBean {
 
         checkCubeDescSignature(cube);
         checkAllowBuilding(cube);
+        checkBuildingSegment(cube);
 
         if (buildType == CubeBuildTypeEnum.BUILD || buildType == 
CubeBuildTypeEnum.REFRESH) {
             checkAllowParallelBuilding(cube);
@@ -281,6 +283,9 @@ public class JobService extends BasicService implements 
InitializingBean {
 
             getExecutableManager().addJob(job);
 
+            //To add job id to the segment info for future system job check.
+            newSeg.setLastBuildJobID(job.getId());
+            getCubeManager().updateCubeSegments(cube, newSeg);
         } catch (Exception e) {
             if (newSeg != null) {
                 logger.error("Job submission might failed for NEW segment {}, 
will clean the NEW segment from cube",
@@ -354,6 +359,8 @@ public class JobService extends BasicService implements 
InitializingBean {
 
                 optimizeJobList.add(optimizeJob);
                 optimizeJobInstances.add(getSingleJobInstance(optimizeJob));
+
+                optimizeSegment.setLastBuildJobID(optimizeJob.getId());
             }
 
             /** Add checkpoint job for batch jobs */
@@ -362,6 +369,8 @@ public class JobService extends BasicService implements 
InitializingBean {
 
             getExecutableManager().addJob(checkpointJob);
 
+            getCubeManager().updateCubeSegments(cube, optimizeSegments);
+
             try {
                 sendTriggerOptimizeMail(cube, checkpointJob.getName(), 
checkpointJob.getProjectName(), submitter);
             } catch (Exception e) {
@@ -457,6 +466,9 @@ public class JobService extends BasicService implements 
InitializingBean {
 
         getExecutableManager().addJob(optimizeJob);
 
+        optimizeSegment.setLastBuildJobID(optimizeJob.getId());
+        getCubeManager().updateCubeSegments(cubeInstance, optimizeSegment);
+        
         JobInstance optimizeJobInstance = getSingleJobInstance(optimizeJob);
 
         /** Update the checkpoint job */
@@ -531,6 +543,33 @@ public class JobService extends BasicService implements 
InitializingBean {
         }
     }
 
+    private void checkBuildingSegment(CubeInstance cube) {
+        checkBuildingSegment(cube, cube.getConfig().getMaxBuildingSegments());
+    }
+
+    private void checkBuildingSegment(CubeInstance cube, int maxBuildingSeg) {
+        if (cube.getBuildingSegments().size() < maxBuildingSeg) {
+            return;
+        }
+
+        int buildingSegments = 0;
+        for (CubeSegment segment : cube.getBuildingSegments()) {
+            buildingSegments++;
+            
+            String jobId = segment.getLastBuildJobID();
+            if (!StringUtil.isEmpty(jobId)) {
+                AbstractExecutable job = getExecutableManager().getJob(jobId);
+                if (job != null && 
"SYSTEM".equalsIgnoreCase(job.getSubmitter())) {
+                    // exclude the system jobs
+                    buildingSegments--;
+                }
+            }
+        }
+        if (buildingSegments >= maxBuildingSeg) {
+            throw new TooManyRequestException("There is already " + 
buildingSegments + " building segment; ");
+        }
+    }
+
     public JobInstance getJobInstance(String uuid) {
         AbstractExecutable job = getExecutableManager().getJob(uuid);
         if (job instanceof CheckpointExecutable) {
diff --git 
a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java 
b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index a617b7c..b2eba98 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -18,6 +18,14 @@
 
 package org.apache.kylin.rest.service;
 
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.job.constant.JobTimeFilterEnum;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -28,18 +36,16 @@ import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.query.QueryConnection;
+import org.apache.kylin.rest.exception.TooManyRequestException;
 import org.junit.Assert;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-
 /**
  * @author xduo
  */
@@ -60,7 +66,8 @@ public class JobServiceTest extends ServiceTestBase {
         Assert.assertNotNull(jobService.getDataModelManager());
         
Assert.assertNotNull(QueryConnection.getConnection(ProjectInstance.DEFAULT_PROJECT_NAME));
         Assert.assertNull(jobService.getJobInstance("job_not_exist"));
-        Assert.assertNotNull(jobService.searchJobs(null, null, null, 0, 0, 
JobTimeFilterEnum.ALL, JobService.JobSearchMode.ALL));
+        Assert.assertNotNull(
+                jobService.searchJobs(null, null, null, 0, 0, 
JobTimeFilterEnum.ALL, JobService.JobSearchMode.ALL));
     }
 
     @Test
@@ -74,6 +81,53 @@ public class JobServiceTest extends ServiceTestBase {
         Assert.assertEquals(0, jobs.size());
     }
 
+    @Test
+    public void testConcurrencyJobs() throws IOException {
+        int maxBuildingSegments = 5;
+
+        System.setProperty("kylin.cube.max-building-segments", 
String.valueOf(maxBuildingSegments));
+        System.setProperty("kylin.cube.ignore-signature-inconsistency", 
"true");
+        CubeManager cubeManager = 
CubeManager.getInstance(jobService.getConfig());
+        CubeInstance cube = cubeManager.getCube("ssb");
+
+        jobService.submitJobInternal(cube, new SegmentRange.TSRange(1L, 
136468L), null, null, null,
+                CubeBuildTypeEnum.BUILD, false, "test", 1);
+        cube = cubeManager.getCube(cube.getName());
+        cubeManager.updateCubeSegStatus(cube.getSegments().get(0), 
SegmentStatusEnum.READY);
+
+        int i = 1;
+        for (; i <= maxBuildingSegments - 1; i++) {
+            cube = cubeManager.getCube(cube.getName());
+            jobService.submitJobInternal(cube, new 
SegmentRange.TSRange(136468L * i, 136468L * (i + 1)), null, null,
+                    null, CubeBuildTypeEnum.BUILD, false, "test", 1);
+        }
+
+        jobService.submitJobInternal(cube, new SegmentRange.TSRange(136468L * 
i, 136468L * (i + 1)), null, null, null,
+                CubeBuildTypeEnum.BUILD, false, "SYSTEM", 1);
+        i++;
+
+        Exception ex = null;
+        try {
+            cube = cubeManager.getCube(cube.getName());
+            jobService.submitJobInternal(cube, new 
SegmentRange.TSRange(136468L * i, 136468L * (i + 1)), null, null,
+                    null, CubeBuildTypeEnum.BUILD, false, "test", 1);
+            i++;
+        } catch (TooManyRequestException e) {
+            ex = e;
+        }
+        Assert.assertNull(ex);
+
+        try {
+            cube = cubeManager.getCube(cube.getName());
+            jobService.submitJobInternal(cube, new 
SegmentRange.TSRange(136468L * i, 136468L * (i + 1)), null, null,
+                    null, CubeBuildTypeEnum.BUILD, false, "test", 1);
+            i++;
+        } catch (TooManyRequestException e) {
+            ex = e;
+        }
+        Assert.assertNotNull(ex);
+    }
+
     public static class TestJob extends CubingJob {
 
         public TestJob() {
diff --git a/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java 
b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java
index c651728..41a2398 100644
--- a/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java
@@ -96,7 +96,7 @@ public class CubeBuildingCLI extends AbstractApplication {
         Preconditions.checkArgument(cube != null, "Cube named " + cubeName + " 
does not exist!!!");
         CubeBuildTypeEnum buildTypeEnum = CubeBuildTypeEnum.valueOf(buildType);
         Preconditions.checkArgument(buildTypeEnum != null, "Build type named " 
+ buildType + " does not exist!!!");
-        submitJob(cube, new TSRange(startDate, endDate), buildTypeEnum, false, 
"SYSTEM");
+        submitJob(cube, new TSRange(startDate, endDate), buildTypeEnum, false, 
"SYSTEM2");
     }
 
     private void submitJob(CubeInstance cube, TSRange tsRange, 
CubeBuildTypeEnum buildType,
@@ -105,19 +105,22 @@ public class CubeBuildingCLI extends AbstractApplication {
 
         DefaultChainedExecutable job;
 
+        CubeSegment newSeg;
         if (buildType == CubeBuildTypeEnum.BUILD) {
-            CubeSegment newSeg = cubeManager.appendSegment(cube, tsRange);
+            newSeg = cubeManager.appendSegment(cube, tsRange);
             job = EngineFactory.createBatchCubingJob(newSeg, submitter, null);
         } else if (buildType == CubeBuildTypeEnum.MERGE) {
-            CubeSegment newSeg = cubeManager.mergeSegments(cube, tsRange, 
null, forceMergeEmptySeg);
+            newSeg = cubeManager.mergeSegments(cube, tsRange, null, 
forceMergeEmptySeg);
             job = EngineFactory.createBatchMergeJob(newSeg, submitter);
         } else if (buildType == CubeBuildTypeEnum.REFRESH) {
-            CubeSegment refreshSeg = cubeManager.refreshSegment(cube, tsRange, 
null);
-            job = EngineFactory.createBatchCubingJob(refreshSeg, submitter, 
null);
+            newSeg = cubeManager.refreshSegment(cube, tsRange, null);
+            job = EngineFactory.createBatchCubingJob(newSeg, submitter, null);
         } else {
             throw new JobException("invalid build type:" + buildType);
         }
+
         executableManager.addJob(job);
+        cubeManager.updateCubeSegments(cube, newSeg);
     }
 
     private void checkCubeDescSignature(CubeInstance cube) {

Reply via email to