Repository: kylin Updated Branches: refs/heads/master 59a30f66d -> fc1e11aa8
KYLIN-2245 slim Segments in CubeMananger Signed-off-by: Yang Li <liy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0a441c3f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0a441c3f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0a441c3f Branch: refs/heads/master Commit: 0a441c3fa30aee0a06cfc6301f7fbfa412103179 Parents: 59a30f6 Author: Cheng Wang <cheng.w...@kyligence.io> Authored: Mon Dec 5 16:17:31 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Mon Dec 5 20:20:30 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/CubeInstance.java | 14 ++ .../java/org/apache/kylin/cube/CubeManager.java | 165 +-------------- .../apache/kylin/metadata/model/ISegment.java | 5 +- .../apache/kylin/metadata/model/Segments.java | 206 ++++++++++++++++++- 4 files changed, 221 insertions(+), 169 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/0a441c3f/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 8b12c2e..ecbb437 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -18,6 +18,7 @@ package org.apache.kylin.cube; +import java.io.IOException; import java.util.List; import java.util.Set; @@ -25,6 +26,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.DataModelDesc; @@ -361,6 +363,18 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, return this.getDescriptor().getAutoMergeTimeRanges() != null && this.getDescriptor().getAutoMergeTimeRanges().length > 0; } + public Pair<Long, Long> autoMergeCubeSegments() throws IOException { + return segments.autoMergeCubeSegments(needAutoMerge(), getName(), getDescriptor().getAutoMergeTimeRanges()); + } + + public Segments calculateToBeSegments(CubeSegment newSegment) { + return segments.calculateToBeSegments(newSegment, getModel().getPartitionDesc().isPartitioned()); + } + + public Pair<CubeSegment, CubeSegment> findMergeOffsetsByDateRange(Segments<CubeSegment> segs, long startDate, long endDate, long skipSegDateRangeCap) { + return this.segments.findMergeOffsetsByDateRange(segs, startDate, endDate, skipSegDateRangeCap); + } + public CubeSegment getLastSegment() { List<CubeSegment> existing = getSegments(); if (existing.isEmpty()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/0a441c3f/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 4ba29af..296a4e7 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -27,7 +27,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; @@ -520,7 +519,7 @@ public class CubeManager implements IRealizationProvider { if (isOffsetsOn) { // offset cube, merge by date range? if (startOffset == endOffset) { - Pair<CubeSegment, CubeSegment> pair = findMergeOffsetsByDateRange(cube.getSegments(SegmentStatusEnum.READY), startDate, endDate, Long.MAX_VALUE); + Pair<CubeSegment, CubeSegment> pair = cube.findMergeOffsetsByDateRange(cube.getSegments(SegmentStatusEnum.READY), startDate, endDate, Long.MAX_VALUE); if (pair == null) throw new IllegalArgumentException("Find no segments to merge by date range " + startDate + "-" + endDate + " for cube " + cube); startOffset = pair.getFirst().getSourceOffsetStart(); @@ -580,32 +579,6 @@ public class CubeManager implements IRealizationProvider { return newSegment; } - private Pair<CubeSegment, CubeSegment> findMergeOffsetsByDateRange(List<CubeSegment> segments, long startDate, long endDate, long skipSegDateRangeCap) { - // must be offset cube - LinkedList<CubeSegment> result = Lists.newLinkedList(); - for (CubeSegment seg : segments) { - - // include if date range overlaps - if (startDate < seg.getDateRangeEnd() && seg.getDateRangeStart() < endDate) { - - // reject too big segment - if (seg.getDateRangeEnd() - seg.getDateRangeStart() > skipSegDateRangeCap) - break; - - // reject holes - if (result.size() > 0 && result.getLast().getSourceOffsetEnd() != seg.getSourceOffsetStart()) - break; - - result.add(seg); - } - } - - if (result.size() <= 1) - return null; - else - return Pair.newPair(result.getFirst(), result.getLast()); - } - public static long minDateRangeStart(List<CubeSegment> mergingSegments) { long min = Long.MAX_VALUE; for (CubeSegment seg : mergingSegments) @@ -708,50 +681,7 @@ public class CubeManager implements IRealizationProvider { } public Pair<Long, Long> autoMergeCubeSegments(CubeInstance cube) throws IOException { - if (!cube.needAutoMerge()) { - logger.debug("Cube " + cube.getName() + " doesn't need auto merge"); - return null; - } - - List<CubeSegment> buildingSegs = cube.getBuildingSegments(); - if (buildingSegs.size() > 0) { - logger.debug("Cube " + cube.getName() + " has " + buildingSegs.size() + " building segments"); - } - - List<CubeSegment> readySegs = cube.getSegments(SegmentStatusEnum.READY); - - List<CubeSegment> mergingSegs = Lists.newArrayList(); - if (buildingSegs.size() > 0) { - - for (CubeSegment building : buildingSegs) { - // exclude those under-merging segs - for (CubeSegment ready : readySegs) { - if (ready.getSourceOffsetStart() >= building.getSourceOffsetStart() && ready.getSourceOffsetEnd() <= building.getSourceOffsetEnd()) { - mergingSegs.add(ready); - } - } - } - } - - // exclude those already under merging segments - readySegs.removeAll(mergingSegs); - - long[] timeRanges = cube.getDescriptor().getAutoMergeTimeRanges(); - Arrays.sort(timeRanges); - - for (int i = timeRanges.length - 1; i >= 0; i--) { - long toMergeRange = timeRanges[i]; - - for (int s = 0; s < readySegs.size(); s++) { - CubeSegment seg = readySegs.get(s); - Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(readySegs.subList(s, readySegs.size()), // - seg.getDateRangeStart(), seg.getDateRangeStart() + toMergeRange, toMergeRange); - if (p != null && p.getSecond().getDateRangeEnd() - p.getFirst().getDateRangeStart() >= toMergeRange) - return Pair.newPair(p.getFirst().getSourceOffsetStart(), p.getSecond().getSourceOffsetEnd()); - } - } - - return null; + return cube.autoMergeCubeSegments(); } public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException { @@ -765,7 +695,7 @@ public class CubeManager implements IRealizationProvider { logger.warn("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY"); } - List<CubeSegment> tobe = calculateToBeSegments(cube, newSegment); + List<CubeSegment> tobe = cube.calculateToBeSegments(newSegment); if (tobe.contains(newSegment) == false) throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe); @@ -786,104 +716,17 @@ public class CubeManager implements IRealizationProvider { } public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) { - List<CubeSegment> tobe = calculateToBeSegments(cube, newSegments); + List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments); List<CubeSegment> newList = Arrays.asList(newSegments); if (tobe.containsAll(newList) == false) { throw new IllegalStateException("For cube " + cube + ", the new segments " + newList + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe); } } - /** - * Smartly figure out the TOBE segments once all new segments are built. - * - Ensures no gap, no overlap - * - Favors new segments over the old - * - Favors big segments over the small - */ - private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment newSegments) { - - List<CubeSegment> tobe = Lists.newArrayList(cube.getSegments()); - if (newSegments != null && !tobe.contains(newSegments)) { - tobe.add(newSegments); - } - if (tobe.size() == 0) - return tobe; - - // sort by source offset - Collections.sort(tobe); - - CubeSegment firstSeg = tobe.get(0); - firstSeg.validate(); - - for (int i = 0, j = 1; j < tobe.size();) { - CubeSegment is = tobe.get(i); - CubeSegment js = tobe.get(j); - js.validate(); - - // check i is either ready or new - if (!isNew(is) && !isReady(is)) { - tobe.remove(i); - continue; - } - - // check j is either ready or new - if (!isNew(js) && !isReady(js)) { - tobe.remove(j); - continue; - } - - if (is.getSourceOffsetStart() == js.getSourceOffsetStart()) { - // if i, j competes - if (isReady(is) && isReady(js) || isNew(is) && isNew(js)) { - // if both new or ready, favor the bigger segment - if (is.getSourceOffsetEnd() <= js.getSourceOffsetEnd()) { - tobe.remove(i); - } else { - tobe.remove(j); - } - continue; - } else { - // otherwise, favor the new segment - if (isNew(is) && is.equals(newSegments)) { - tobe.remove(j); - continue; - } else if (js.equals(newSegments)) { - tobe.remove(i); - continue; - } - } - } - - // if i, j in sequence - if (is.getSourceOffsetEnd() <= js.getSourceOffsetStart()) { - i++; - j++; - continue; - } - - // js can be covered by is - if (is.equals(newSegments)) { - // seems j not fitting - tobe.remove(j); - continue; - } else { - i++; - j++; - continue; - } - - } - - return tobe; - } - private boolean isReady(CubeSegment seg) { return seg.getStatus() == SegmentStatusEnum.READY; } - private boolean isNew(CubeSegment seg) { - return seg.getStatus() == SegmentStatusEnum.NEW || seg.getStatus() == SegmentStatusEnum.READY_PENDING; - } - private void loadAllCubeInstance() throws IOException { ResourceStore store = getStore(); List<String> paths = store.collectResourceRecursively(ResourceStore.CUBE_RESOURCE_ROOT, ".json"); http://git-wip-us.apache.org/repos/asf/kylin/blob/0a441c3f/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java index e3fcdcb..9d26927 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java @@ -18,7 +18,7 @@ package org.apache.kylin.metadata.model; -public interface ISegment{ +public interface ISegment { public String getName(); @@ -29,11 +29,12 @@ public interface ISegment{ public long getSourceOffsetStart(); public long getSourceOffsetEnd(); - + public DataModelDesc getModel(); public SegmentStatusEnum getStatus(); public long getLastBuildTime(); + public boolean isSourceOffsetsOn(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a441c3f/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java index f0a58cb..bc115cc 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java @@ -18,16 +18,25 @@ package org.apache.kylin.metadata.model; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; + +import org.apache.kylin.common.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Segments<T extends ISegment> extends ArrayList<T> { - + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(Segments.class); + public static boolean sourceOffsetContains(ISegment a, ISegment b) { return a.getSourceOffsetStart() <= b.getSourceOffsetStart() && b.getSourceOffsetEnd() <= a.getSourceOffsetEnd(); } - + public static boolean sourceOffsetOverlaps(ISegment a, ISegment b) { return a.getSourceOffsetStart() < b.getSourceOffsetEnd() && b.getSourceOffsetStart() < a.getSourceOffsetEnd(); } @@ -44,7 +53,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> { Segments<T> readySegs = getSegments(SegmentStatusEnum.READY); long startTime = Long.MAX_VALUE; - for (T seg : readySegs) { + for (ISegment seg : readySegs) { startTime = Math.min(startTime, seg.getDateRangeStart()); } @@ -55,7 +64,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> { Segments<T> readySegs = getSegments(SegmentStatusEnum.READY); long endTime = Long.MIN_VALUE; - for (T seg : readySegs) { + for (ISegment seg : readySegs) { endTime = Math.max(endTime, seg.getDateRangeEnd()); } @@ -87,7 +96,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> { return latest; } - public Segments getSegments(SegmentStatusEnum status) { + public Segments<T> getSegments(SegmentStatusEnum status) { Segments<T> result = new Segments<>(); for (T segment : this) { @@ -107,7 +116,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> { return null; } - public Segments getBuildingSegments() { + public Segments<T> getBuildingSegments() { Segments<T> buildingSegments = new Segments(); if (null != this) { for (T segment : this) { @@ -142,9 +151,194 @@ public class Segments<T extends ISegment> extends ArrayList<T> { return result; } + public Pair<Long, Long> autoMergeCubeSegments(boolean needAutoMerge, String cubeName, long[] timeRanges) throws IOException { + if (!needAutoMerge) { + logger.debug("Cube " + cubeName + " doesn't need auto merge"); + return null; + } + + int buildingSize = getBuildingSegments().size(); + if (buildingSize > 0) { + logger.debug("Cube " + cubeName + " has " + buildingSize + " building segments"); + } + + Segments<T> readySegs = getSegments(SegmentStatusEnum.READY); + + Segments mergingSegs = new Segments(); + if (buildingSize > 0) { + + for (ISegment building : getBuildingSegments()) { + // exclude those under-merging segs + for (ISegment ready : readySegs) { + if (ready.getSourceOffsetStart() >= building.getSourceOffsetStart() && ready.getSourceOffsetEnd() <= building.getSourceOffsetEnd()) { + mergingSegs.add(ready); + } + } + } + } + + // exclude those already under merging segments + readySegs.removeAll(mergingSegs); + + Arrays.sort(timeRanges); + + for (int i = timeRanges.length - 1; i >= 0; i--) { + long toMergeRange = timeRanges[i]; + + for (int s = 0; s < readySegs.size(); s++) { + ISegment seg = readySegs.get(s); + Pair<T, T> p = findMergeOffsetsByDateRange(readySegs.getSubList(s, readySegs.size()), // + seg.getDateRangeStart(), seg.getDateRangeStart() + toMergeRange, toMergeRange); + if (p != null && p.getSecond().getDateRangeEnd() - p.getFirst().getDateRangeStart() >= toMergeRange) + return Pair.newPair(p.getFirst().getSourceOffsetStart(), p.getSecond().getSourceOffsetEnd()); + } + } + + return null; + } + + public Pair<T, T> findMergeOffsetsByDateRange(Segments<T> segments, long startDate, long endDate, long skipSegDateRangeCap) { + // must be offset cube + Segments result = new Segments(); + for (ISegment seg : segments) { + + // include if date range overlaps + if (startDate < seg.getDateRangeEnd() && seg.getDateRangeStart() < endDate) { + + // reject too big segment + if (seg.getDateRangeEnd() - seg.getDateRangeStart() > skipSegDateRangeCap) + break; + + // reject holes + if (result.size() > 0 && result.getLast().getSourceOffsetEnd() != seg.getSourceOffsetStart()) + break; + + result.add(seg); + } + } + + if (result.size() <= 1) + return null; + else + return (Pair<T, T>) Pair.newPair(result.getFirst(), result.getLast()); + } + + /** + * Smartly figure out the TOBE segments once all new segments are built. + * - Ensures no gap, no overlap + * - Favors new segments over the old + * - Favors big segments over the small + */ + public Segments calculateToBeSegments(ISegment newSegment, boolean isPartitioned) { + + Segments tobe = (Segments) this.clone(); + if (newSegment != null && !tobe.contains(newSegment)) { + tobe.add(newSegment); + } + if (tobe.size() == 0) + return tobe; + + // sort by source offset + Collections.sort(tobe); + + ISegment firstSeg = tobe.getFirst(); + validate(firstSeg, isPartitioned); + + for (int i = 0, j = 1; j < tobe.size();) { + ISegment is = (ISegment) tobe.get(i); + ISegment js = (ISegment) tobe.get(j); + validate(js, isPartitioned); + + // check i is either ready or new + if (!isNew(is) && !isReady(is)) { + tobe.remove(i); + continue; + } + + // check j is either ready or new + if (!isNew(js) && !isReady(js)) { + tobe.remove(j); + continue; + } + + if (is.getSourceOffsetStart() == js.getSourceOffsetStart()) { + // if i, j competes + if (isReady(is) && isReady(js) || isNew(is) && isNew(js)) { + // if both new or ready, favor the bigger segment + if (is.getSourceOffsetEnd() <= js.getSourceOffsetEnd()) { + tobe.remove(i); + } else { + tobe.remove(j); + } + continue; + } else { + // otherwise, favor the new segment + if (isNew(is) && is.equals(newSegment)) { + tobe.remove(j); + continue; + } else if (js.equals(newSegment)) { + tobe.remove(i); + continue; + } + } + } + + // if i, j in sequence + if (is.getSourceOffsetEnd() <= js.getSourceOffsetStart()) { + i++; + j++; + continue; + } + + // js can be covered by is + if (is.equals(newSegment)) { + // seems j not fitting + tobe.remove(j); + continue; + } else { + i++; + j++; + continue; + } + + } + + return tobe; + } + + private void validate(ISegment seg, boolean isPartitioned) { + if (isPartitioned) { + if (!seg.isSourceOffsetsOn() && seg.getDateRangeStart() >= seg.getDateRangeEnd()) + throw new IllegalStateException("Invalid segment, dateRangeStart(" + seg.getDateRangeStart() + ") must be smaller than dateRangeEnd(" + seg.getDateRangeEnd() + ") in segment " + seg); + if (seg.isSourceOffsetsOn() && seg.getSourceOffsetStart() >= seg.getSourceOffsetEnd()) + throw new IllegalStateException("Invalid segment, sourceOffsetStart(" + seg.getSourceOffsetStart() + ") must be smaller than sourceOffsetEnd(" + seg.getSourceOffsetEnd() + ") in segment " + seg); + } + } + + private boolean isReady(ISegment seg) { + return seg.getStatus() == SegmentStatusEnum.READY; + } + + private boolean isNew(ISegment seg) { + return seg.getStatus() == SegmentStatusEnum.NEW || seg.getStatus() == SegmentStatusEnum.READY_PENDING; + } + private T getLast() { assert this.size() != 0; return this.get(this.size() - 1); } + private T getFirst() { + assert this.size() != 0; + return this.get(0); + } + + private Segments<T> getSubList(int from, int to) { + Segments<T> result = new Segments<>(); + for (T seg : this.subList(from, to)) { + result.add(seg); + } + return result; + } + } \ No newline at end of file