This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 8a8d1b0 KYLIN-4273 Make cube planner works for real-time streaming job 8a8d1b0 is described below commit 8a8d1b0a081b0040284bb612254bce5007cf0729 Author: Ma,Gang <ga...@ebay.com> AuthorDate: Thu Nov 28 19:38:20 2019 +0800 KYLIN-4273 Make cube planner works for real-time streaming job --- .../kylin/stream/coordinator/Coordinator.java | 36 +++++++++++++++++- .../coordinator/coordinate/BuildJobSubmitter.java | 44 +++++++++++++++++++--- .../coordinate/BuildJobSubmitterTest.java | 16 ++++---- .../coordinator/coordinate/StreamingTestBase.java | 9 +++++ 4 files changed, 91 insertions(+), 14 deletions(-) diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java index 46a9bcf..938c0b4 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java @@ -56,6 +56,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.StreamingCubingEngine; +import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; @@ -1133,12 +1134,20 @@ public class Coordinator implements CoordinatorClient { private List<String> findSegmentsCanBuild(String cubeName) { List<String> result = Lists.newArrayList(); CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + // in optimization + if (isInOptimize(cubeInstance)) { + return result; + } + int allowMaxBuildingSegments = cubeInstance.getConfig().getMaxBuildingSegments(); CubeSegment latestHistoryReadySegment = cubeInstance.getLatestReadySegment(); long minSegmentStart = -1; if (latestHistoryReadySegment != null) { minSegmentStart = latestHistoryReadySegment.getTSRange().end.v; + } else { + // there is no ready segment, to make cube planner work, only 1 segment can build + logger.info("there is no ready segments for cube:{}, so only allow 1 segment build concurrently", cubeName); + allowMaxBuildingSegments = 1; } - int allowMaxBuildingSegments = cubeInstance.getConfig().getMaxBuildingSegments(); CubeAssignment assignments = streamMetadataStore.getAssignmentsByCube(cubeName); Set<Integer> cubeAssignedReplicaSets = assignments.getReplicaSetIDs(); @@ -1214,6 +1223,31 @@ public class Coordinator implements CoordinatorClient { return result; } + private boolean isInOptimize(CubeInstance cube) { + Segments<CubeSegment> readyPendingSegments = cube.getSegments(SegmentStatusEnum.READY_PENDING); + if (readyPendingSegments.size() > 0) { + logger.info("The cube {} has READY_PENDING segments {}. It's not allowed for building", + cube.getName(), readyPendingSegments); + return true; + } + Segments<CubeSegment> newSegments = cube.getSegments(SegmentStatusEnum.NEW); + for (CubeSegment newSegment : newSegments) { + String jobId = newSegment.getLastBuildJobID(); + if (jobId == null) { + continue; + } + AbstractExecutable job = getExecutableManager().getJob(jobId); + if (job != null && job instanceof CubingJob) { + CubingJob cubingJob = (CubingJob) job; + if (CubingJob.CubingJobTypeEnum.OPTIMIZE.toString().equals(cubingJob.getJobType())) { + logger.info("The cube {} is in optimization. It's not allowed to build new segments during optimization.", cube.getName()); + return true; + } + } + } + return false; + } + /** * <pre> * When all replica sets have uploaded their local segment cache to remote, we can mark diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java index 9e32f46..6f5bb0d 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java @@ -27,10 +27,12 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.StreamingCubingEngine; +import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.stream.coordinator.StreamingCubeInfo; import org.apache.kylin.stream.coordinator.coordinate.annotations.NonSideEffect; import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent; @@ -148,7 +150,7 @@ public class BuildJobSubmitter implements Runnable { if (checkTimes % 100 == 1) { logger.info("Force traverse all cubes periodically."); for (StreamingCubeInfo cubeInfo : coordinator.getEnableStreamingCubes()) { - List<String> segmentList = checkSegmentBuidJobFromMetadata(cubeInfo.getCubeName()); + List<String> segmentList = checkSegmentBuildJobFromMetadata(cubeInfo.getCubeName()); for (String segmentName : segmentList) { submitSegmentBuildJob(cubeInfo.getCubeName(), segmentName); } @@ -218,7 +220,7 @@ public class BuildJobSubmitter implements Runnable { Iterator<String> iterator = cubeCheckList.iterator(); while (iterator.hasNext()) { String cubeName = iterator.next(); - List<String> segmentList = checkSegmentBuidJobFromMetadata(cubeName); + List<String> segmentList = checkSegmentBuildJobFromMetadata(cubeName); boolean allSubmited = true; for (String segmentName : segmentList) { boolean ok = submitSegmentBuildJob(cubeName, segmentName); @@ -241,16 +243,23 @@ public class BuildJobSubmitter implements Runnable { * @return list of segment which could be submitted a segment build job */ @NonSideEffect - List<String> checkSegmentBuidJobFromMetadata(String cubeName) { + List<String> checkSegmentBuildJobFromMetadata(String cubeName) { List<String> result = Lists.newArrayList(); CubeInstance cubeInstance = coordinator.getCubeManager().getCube(cubeName); + // in optimization + if (isInOptimize(cubeInstance)) { + return result; + } + int allowMaxBuildingSegments = cubeInstance.getConfig().getMaxBuildingSegments(); CubeSegment latestHistoryReadySegment = cubeInstance.getLatestReadySegment(); - long minSegmentStart = -1; if (latestHistoryReadySegment != null) { minSegmentStart = latestHistoryReadySegment.getTSRange().end.v; + } else { + // there is no ready segment, to make cube planner work, only 1 segment can build + logger.info("there is no ready segments for cube:{}, so only allow 1 segment build concurrently", cubeName); + allowMaxBuildingSegments = 1; } - int allowMaxBuildingSegments = cubeInstance.getConfig().getMaxBuildingSegments(); CubeAssignment assignments = coordinator.getStreamMetadataStore().getAssignmentsByCube(cubeName); Set<Integer> cubeAssignedReplicaSets = assignments.getReplicaSetIDs(); @@ -308,6 +317,31 @@ public class BuildJobSubmitter implements Runnable { return result; } + private boolean isInOptimize(CubeInstance cube) { + Segments<CubeSegment> readyPendingSegments = cube.getSegments(SegmentStatusEnum.READY_PENDING); + if (readyPendingSegments.size() > 0) { + logger.info("The cube {} has READY_PENDING segments {}. It's not allowed for building", + cube.getName(), readyPendingSegments); + return true; + } + Segments<CubeSegment> newSegments = cube.getSegments(SegmentStatusEnum.NEW); + for (CubeSegment newSegment : newSegments) { + String jobId = newSegment.getLastBuildJobID(); + if (jobId == null) { + continue; + } + AbstractExecutable job = coordinator.getExecutableManager().getJob(jobId); + if (job != null && job instanceof CubingJob) { + CubingJob cubingJob = (CubingJob) job; + if (CubingJob.CubingJobTypeEnum.OPTIMIZE.toString().equals(cubingJob.getJobType())) { + logger.info("The cube {} is in optimization. It's not allowed to build new segments during optimization.", cube.getName()); + return true; + } + } + } + return false; + } + /** * Submit a build job for streaming segment * diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java index 95a0f05..044534c 100644 --- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java +++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java @@ -103,7 +103,7 @@ public class BuildJobSubmitterTest extends StreamingTestBase { assertEquals(0, buildJobSubmitter.getCubeCheckList().size()); } - void prepareTestCheckSegmentBuidJobFromMetadata() { + void prepareTestCheckSegmentBuildJobFromMetadata() { CubeSegment cubeSegment = stubCubSegment(SegmentStatusEnum.NEW, 100L, 200L); CubeInstance cubeInstance = stubCubeInstance(cubeSegment); config = stubKylinConfig(); @@ -124,24 +124,24 @@ public class BuildJobSubmitterTest extends StreamingTestBase { } @Test - public void testCheckSegmentBuidJobFromMetadata() { - prepareTestCheckSegmentBuidJobFromMetadata(); + public void testCheckSegmentBuildJobFromMetadata() { + prepareTestCheckSegmentBuildJobFromMetadata(); BuildJobSubmitter buildJobSubmitter = new BuildJobSubmitter(streamingCoordinator); buildJobSubmitter.restore(); - List<String> segmentReadyList = buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName2); + List<String> segmentReadyList = buildJobSubmitter.checkSegmentBuildJobFromMetadata(cubeName2); assertEquals(1, segmentReadyList.size()); - segmentReadyList = buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName3); + segmentReadyList = buildJobSubmitter.checkSegmentBuildJobFromMetadata(cubeName3); assertEquals(1, segmentReadyList.size()); } @Test - public void testCheckSegmentBuidJobFromMetadata1() { - prepareTestCheckSegmentBuidJobFromMetadata(); + public void testCheckSegmentBuildJobFromMetadata1() { + prepareTestCheckSegmentBuildJobFromMetadata(); BuildJobSubmitter buildJobSubmitter = new BuildJobSubmitter(streamingCoordinator); buildJobSubmitter.restore(); - List<String> segmentReadyList = buildJobSubmitter.checkSegmentBuidJobFromMetadata(cubeName4); + List<String> segmentReadyList = buildJobSubmitter.checkSegmentBuildJobFromMetadata(cubeName4); verify(executableManager, times(1)).resumeJob(eq(mockBuildJob4)); assertEquals(0, segmentReadyList.size()); } diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java index 5308d79..c7b1ac5 100644 --- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java +++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java @@ -315,14 +315,23 @@ public class StreamingTestBase extends LocalFileMetadataTestCase { CubeInstance stubCubeInstance(CubeSegment cubSegment) { CubeInstance cubeInstance = mock(CubeInstance.class); + CubeSegment readySegment = stubCubSegment(SegmentStatusEnum.READY, 0L, 1L); when(cubeInstance.latestCopyForWrite()).thenReturn(cubeInstance); @SuppressWarnings("unchecked") Segments<CubeSegment> segmentSegments = mock(Segments.class, RETURNS_DEEP_STUBS); + Segments<CubeSegment> optimizedSegments = mock(Segments.class, RETURNS_DEEP_STUBS); + when(segmentSegments.size()).thenReturn(1); when(cubeInstance.getBuildingSegments()).thenReturn(segmentSegments); when(cubeInstance.getName()).thenReturn(cubeName1); when(cubeInstance.getSegment(anyString(), Matchers.any())).thenReturn(cubSegment); + + when(optimizedSegments.size()).thenReturn(0); + when(cubeInstance.getLatestReadySegment()).thenReturn(readySegment); + when(cubeInstance.getSegments(SegmentStatusEnum.READY_PENDING)).thenReturn(optimizedSegments); + when(cubeInstance.getSegments(SegmentStatusEnum.NEW)).thenReturn(segmentSegments); + return cubeInstance; }