http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/SegmentRange.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/SegmentRange.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/SegmentRange.java new file mode 100644 index 0000000..e52da09 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/SegmentRange.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.model; + +import java.io.Serializable; +import java.util.Comparator; + +/** + * SegmentRange and TSRange seem similar but are different concepts. + * + * - SegmentRange defines the range of a segment. + * - TSRange is the time series range of the segment data. + * - When segment range is defined by time, the two can be the same, in that case TSRange is a kind of SegmentRange. + * - Duration segment creation (build/refresh/merge), a new segment is defined by either one of the two, not both. + * - And the choice must be consistent across all following segment creation. + */ +@SuppressWarnings("serial") +public class SegmentRange<T extends Comparable> implements Serializable { + + public final Endpoint<T> start; + public final Endpoint<T> end; + + public SegmentRange(Endpoint start, Endpoint end) { + this.start = start; + this.end = end; + checkState(); + } + + public SegmentRange(T start, T end) { + if (start != null && end != null && start.getClass() != end.getClass()) + throw new IllegalArgumentException(); + + this.start = new Endpoint(start, start == null, false); + this.end = new Endpoint(end, false, end == null); + checkState(); + } + + private void checkState() { + if (start.compareTo(end) > 0) + throw new IllegalStateException(); + } + + public boolean isInfinite() { + return start.isMin && end.isMax; + } + + public boolean contains(SegmentRange o) { + return this.start.compareTo(o.start) <= 0 && o.end.compareTo(this.end) <= 0; + } + + public boolean overlaps(SegmentRange o) { + return this.start.compareTo(o.end) < 0 && o.start.compareTo(this.end) < 0; + } + + public boolean connects(SegmentRange o) { + return this.end.compareTo(o.start) == 0; + } + + public boolean apartBefore(SegmentRange o) { + return this.end.compareTo(o.start) < 0; + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "[" + start + "," + end + ")"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((end == null) ? 0 : end.hashCode()); + result = prime * result + ((start == null) ? 0 : start.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SegmentRange other = (SegmentRange) obj; + if (end == null) { + if (other.end != null) + return false; + } else if (!end.equals(other.end)) + return false; + if (start == null) { + if (other.start != null) + return false; + } else if (!start.equals(other.start)) + return false; + return true; + } + + // ============================================================================ + + public static class TSRange extends SegmentRange<Long> { + + public TSRange(Long start, Long end) { + // [0, Long.MAX_VALUE) is full build (for historic reason) + super(new Endpoint(isInfinite(start, end) ? 0 : start, isInfinite(start, end), false), // + new Endpoint(isInfinite(start, end) ? Long.MAX_VALUE : end, false, isInfinite(start, end))); + } + + private static boolean isInfinite(Long start, Long end) { + return (start == null || start <= 0) && (end == null || end == Long.MAX_VALUE); + } + + public long duration() { + return end.v - start.v; + } + } + + // ============================================================================ + + // immutable + public static class Endpoint<T extends Comparable> implements Comparable<Endpoint>, Serializable { + + public static final Comparator<Endpoint> comparator = getComparator(new Comparator() { + @Override + public int compare(Object o1, Object o2) { + return ((Comparable) o1).compareTo(o2); + } + }); + + public static Comparator<Endpoint> getComparator(final Comparator valueComparator) { + return new Comparator<Endpoint>() { + @Override + public int compare(Endpoint a, Endpoint b) { + if (a.isMin) { + return b.isMin ? 0 : -1; + } else if (b.isMin) { + return a.isMin ? 0 : 1; + } else if (a.isMax) { + return b.isMax ? 0 : 1; + } else if (b.isMax) { + return a.isMax ? 0 : -1; + } else { + if (a == null || b == null) + throw new IllegalStateException(); + + return valueComparator.compare(a.v, b.v); + } + } + }; + } + + public final T v; + public final boolean isMin; + public final boolean isMax; + + private Endpoint(T v, boolean isMin, boolean isMax) { + this.v = v; + this.isMin = isMin; + this.isMax = isMax; + } + + @Override + public int compareTo(Endpoint o) { + return comparator.compare(this, o); + } + + @Override + public String toString() { + String s = "" + v; + if (isMin) + s += "[min]"; + if (isMax) + s += "[max]"; + return s; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (isMax ? 1231 : 1237); + result = prime * result + (isMin ? 1231 : 1237); + result = prime * result + ((v == null) ? 0 : v.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Endpoint other = (Endpoint) obj; + if (isMax != other.isMax) + return false; + if (isMin != other.isMin) + return false; + + return comparator.compare(this, other) == 0; + } + } +}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java index 73a385c..7af1ebc 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java @@ -23,25 +23,42 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; +import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.model.SegmentRange.Endpoint; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + public class Segments<T extends ISegment> extends ArrayList<T> implements Serializable{ private static final long serialVersionUID = 1L; private static final Logger logger = LoggerFactory.getLogger(Segments.class); - public static boolean sourceOffsetContains(ISegment a, ISegment b) { - return a.getSourceOffsetStart() <= b.getSourceOffsetStart() && b.getSourceOffsetEnd() <= a.getSourceOffsetEnd(); + public static ISegmentAdvisor newSegmentAdvisor(ISegment seg) { + try { + Class<? extends ISegmentAdvisor> clz = ClassUtil.forName(seg.getConfig().getSegmentAdvisor(), ISegmentAdvisor.class); + return clz.getConstructor(ISegment.class).newInstance(seg); + } catch (Exception e) { + throw new RuntimeException(e); + } } - - public static boolean sourceOffsetOverlaps(ISegment a, ISegment b) { - return a.getSourceOffsetStart() < b.getSourceOffsetEnd() && b.getSourceOffsetStart() < a.getSourceOffsetEnd(); + + // ============================================================================ + + public Segments() { + super(); } + public Segments(Segments<T> copy) { + super(copy); + } + public T getFirstSegment() { if (this == null || this.size() == 0) { return null; @@ -49,24 +66,24 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial return this.get(0); } } - - public long getDateRangeStart() { + + public long getTSStart() { Segments<T> readySegs = getSegments(SegmentStatusEnum.READY); long startTime = Long.MAX_VALUE; for (ISegment seg : readySegs) { - startTime = Math.min(startTime, seg.getDateRangeStart()); + startTime = Math.min(startTime, seg.getTSRange().start.v); } return startTime; } - public long getDateRangeEnd() { + public long getTSEnd() { Segments<T> readySegs = getSegments(SegmentStatusEnum.READY); long endTime = Long.MIN_VALUE; for (ISegment seg : readySegs) { - endTime = Math.max(endTime, seg.getDateRangeEnd()); + endTime = Math.max(endTime, seg.getTSRange().end.v); } return endTime; @@ -78,7 +95,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial T seg = this.get(i); if (seg.getStatus() != SegmentStatusEnum.READY) continue; - if (latest == null || latest.getDateRangeEnd() < seg.getDateRangeEnd()) { + if (latest == null || latest.getTSRange().end.v < seg.getTSRange().end.v) { latest = seg; } } @@ -129,7 +146,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial return buildingSegments; } - public Segments getMergingSegments(T mergedSegment) { + public Segments<T> getMergingSegments(T mergedSegment) { Segments<T> result = new Segments(); if (mergedSegment == null) return result; @@ -141,9 +158,9 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial if (seg == mergedSegment) continue; - if (sourceOffsetContains(mergedSegment, seg)) { + if (mergedSegment.getSegRange().contains(seg.getSegRange())) { // make sure no holes - if (result.size() > 0 && result.getLast().getSourceOffsetEnd() != seg.getSourceOffsetStart()) + if (result.size() > 0 && !result.getLast().getSegRange().connects(seg.getSegRange())) throw new IllegalStateException("Merging segments must not have gaps between " + result.getLast() + " and " + seg); result.add(seg); @@ -152,7 +169,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial return result; } - public Pair<Long, Long> autoMergeCubeSegments(boolean needAutoMerge, String cubeName, long[] timeRanges) throws IOException { + public SegmentRange autoMergeCubeSegments(boolean needAutoMerge, String cubeName, long[] timeRanges) throws IOException { if (!needAutoMerge) { logger.debug("Cube " + cubeName + " doesn't need auto merge"); return null; @@ -171,7 +188,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial for (ISegment building : getBuildingSegments()) { // exclude those under-merging segs for (ISegment ready : readySegs) { - if (ready.getSourceOffsetStart() >= building.getSourceOffsetStart() && ready.getSourceOffsetEnd() <= building.getSourceOffsetEnd()) { + if (building.getSegRange().contains(ready.getSegRange())) { mergingSegs.add(ready); } } @@ -188,30 +205,31 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial for (int s = 0; s < readySegs.size(); s++) { ISegment seg = readySegs.get(s); + TSRange tsRange = new TSRange(seg.getTSRange().start.v, seg.getTSRange().start.v + toMergeRange); Pair<T, T> p = readySegs.getSubList(s, readySegs.size()) // - .findMergeOffsetsByDateRange(seg.getDateRangeStart(), seg.getDateRangeStart() + toMergeRange, toMergeRange); - if (p != null && p.getSecond().getDateRangeEnd() - p.getFirst().getDateRangeStart() >= toMergeRange) - return Pair.newPair(p.getFirst().getSourceOffsetStart(), p.getSecond().getSourceOffsetEnd()); + .findMergeOffsetsByDateRange(tsRange, toMergeRange); + if (p != null && p.getSecond().getTSRange().end.v - p.getFirst().getTSRange().start.v >= toMergeRange) + return new SegmentRange(p.getFirst().getSegRange().start.v, p.getSecond().getSegRange().end.v); } } return null; } - public Pair<T, T> findMergeOffsetsByDateRange(long startDate, long endDate, long skipSegDateRangeCap) { + public Pair<T, T> findMergeOffsetsByDateRange(TSRange tsRange, long skipSegDateRangeCap) { // must be offset cube Segments result = new Segments(); for (ISegment seg : this) { // include if date range overlaps - if (startDate < seg.getDateRangeEnd() && seg.getDateRangeStart() < endDate) { + if (tsRange.overlaps(seg.getTSRange())) { // reject too big segment - if (seg.getDateRangeEnd() - seg.getDateRangeStart() > skipSegDateRangeCap) + if (seg.getTSRange().duration() > skipSegDateRangeCap) break; // reject holes - if (result.size() > 0 && result.getLast().getSourceOffsetEnd() != seg.getSourceOffsetStart()) + if (result.size() > 0 && !result.getLast().getSegRange().connects(seg.getSegRange())) break; result.add(seg); @@ -262,11 +280,11 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial continue; } - if (is.getSourceOffsetStart() == js.getSourceOffsetStart()) { + if (is.getSegRange().start.compareTo(js.getSegRange().start) == 0) { // if i, j competes if (isReady(is) && isReady(js) || isNew(is) && isNew(js)) { // if both new or ready, favor the bigger segment - if (is.getSourceOffsetEnd() <= js.getSourceOffsetEnd()) { + if (is.getSegRange().end.compareTo(js.getSegRange().end) <= 0) { tobe.remove(i); } else { tobe.remove(j); @@ -285,7 +303,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial } // if i, j in sequence - if (is.getSourceOffsetEnd() <= js.getSourceOffsetStart()) { + if (is.getSegRange().end.compareTo(js.getSegRange().start) <= 0) { i++; j++; continue; @@ -333,4 +351,100 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial return result; } + /** + * Validates: + * - consistent isOffsetCube() + * - for all ready segments, sourceOffset MUST have no overlaps, SHOULD have no holes + * - for all new segments, sourceOffset MUST have no overlaps, MUST contain a ready segment if overlaps with it + * - for all new segments, sourceOffset SHOULD fit/connect another segments + * - dateRange does not matter any more + */ + public void validate() { + if (this.isEmpty()) + return; + + // make a copy, don't modify existing list + Segments<T> all = new Segments<T>(this); + Collections.sort(all); + + // check consistent isOffsetCube() + boolean isOffsetCube = all.get(0).isOffsetCube(); + for (ISegment seg : all) { + seg.validate(); + if (seg.isOffsetCube() != isOffsetCube) + throw new IllegalStateException("Inconsistent isOffsetsOn for segment " + seg); + } + + List<ISegment> ready = Lists.newArrayListWithCapacity(all.size()); + List<ISegment> news = Lists.newArrayListWithCapacity(all.size()); + for (ISegment seg : all) { + if (seg.getStatus() == SegmentStatusEnum.READY) + ready.add(seg); + else + news.add(seg); + } + + // for all ready segments, sourceOffset MUST have no overlaps, SHOULD have no holes + ISegment pre = null; + for (ISegment seg : ready) { + if (pre != null) { + if (pre.getSegRange().overlaps(seg.getSegRange())) + throw new IllegalStateException("Segments overlap: " + pre + " and " + seg); + if (pre.getSegRange().apartBefore(seg.getSegRange())) + logger.warn("Hole between adjacent READY segments " + pre + " and " + seg); + } + pre = seg; + } + + // for all other segments, sourceOffset MUST have no overlaps, MUST contain a ready segment if overlaps with it + pre = null; + for (ISegment seg : news) { + if (pre != null) { + if (pre.getSegRange().overlaps(seg.getSegRange())) + throw new IllegalStateException("Segments overlap: " + pre + " and " + seg); + } + pre = seg; + + for (ISegment aReady : ready) { + if (seg.getSegRange().overlaps(aReady.getSegRange()) && !seg.getSegRange().contains(aReady.getSegRange())) + throw new IllegalStateException("Segments overlap: " + aReady + " and " + seg); + } + } + + // for all other segments, sourceOffset SHOULD fit/connect other segments + for (ISegment seg : news) { + Pair<Boolean, Boolean> pair = all.fitInSegments(seg); + boolean startFit = pair.getFirst(); + boolean endFit = pair.getSecond(); + + if (!startFit) + logger.warn("NEW segment start does not fit/connect with other segments: " + seg); + if (!endFit) + logger.warn("NEW segment end does not fit/connect with other segments: " + seg); + } + } + + public Pair<Boolean, Boolean> fitInSegments(ISegment newOne) { + if (this.isEmpty()) + return null; + + ISegment first = this.get(0); + ISegment last = this.get(this.size() - 1); + Endpoint start = newOne.getSegRange().start; + Endpoint end = newOne.getSegRange().end; + boolean startFit = false; + boolean endFit = false; + for (ISegment sss : this) { + if (sss == newOne) + continue; + startFit = startFit || (start.equals(sss.getSegRange().start) || start.equals(sss.getSegRange().end)); + endFit = endFit || (end.equals(sss.getSegRange().start) || end.equals(sss.getSegRange().end)); + } + if (!startFit && endFit && newOne == first) + startFit = true; + if (!endFit && startFit && newOne == last) + endFit = true; + + return Pair.newPair(startFit, endFit); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java index f4aec12..5c2ebac 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java @@ -25,6 +25,7 @@ import java.util.Map; import com.google.common.collect.Maps; +@SuppressWarnings("serial") public class TableRef implements Serializable { final transient private DataModelDesc model; http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java index 5563856..e7250a3 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java @@ -27,7 +27,7 @@ import org.apache.kylin.metadata.datatype.DataType; /** */ -@SuppressWarnings({ "serial", "deprecation" }) +@SuppressWarnings({ "serial" }) public class TblColRef implements Serializable { private static final String INNER_TABLE_NAME = "_kylin_table"; http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java index 43e46c6..b6d91ab 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java @@ -21,6 +21,9 @@ package org.apache.kylin.source; import java.util.HashMap; import java.util.Map; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; + import com.google.common.base.Objects; /** @@ -34,55 +37,35 @@ import com.google.common.base.Objects; * its own start and end offset within that partition. */ public class SourcePartition { - long startDate; - long endDate; - long startOffset; - long endOffset; + TSRange tsRange; + SegmentRange segRange; Map<Integer, Long> sourcePartitionOffsetStart; Map<Integer, Long> sourcePartitionOffsetEnd; public SourcePartition() { } - public SourcePartition(long startDate, long endDate, long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) { - this.startDate = startDate; - this.endDate = endDate; - this.startOffset = startOffset; - this.endOffset = endOffset; + public SourcePartition(TSRange tsRange, SegmentRange segRange, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) { + this.tsRange = tsRange; + this.segRange = segRange; this.sourcePartitionOffsetStart = sourcePartitionOffsetStart; this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd; } - public long getStartDate() { - return startDate; - } - - public void setStartDate(long startDate) { - this.startDate = startDate; - } - - public long getEndDate() { - return endDate; - } - - public void setEndDate(long endDate) { - this.endDate = endDate; - } - - public long getStartOffset() { - return startOffset; + public TSRange getTSRange() { + return tsRange; } - public void setStartOffset(long startOffset) { - this.startOffset = startOffset; + public void setTSRange(TSRange tsRange) { + this.tsRange = tsRange; } - public long getEndOffset() { - return endOffset; + public SegmentRange getSegRange() { + return segRange; } - public void setEndOffset(long endOffset) { - this.endOffset = endOffset; + public void setSegRange(SegmentRange segRange) { + this.segRange = segRange; } public Map<Integer, Long> getSourcePartitionOffsetStart() { @@ -103,15 +86,13 @@ public class SourcePartition { @Override public String toString() { - return Objects.toStringHelper(this).add("startDate", startDate).add("endDate", endDate).add("startOffset", startOffset).add("endOffset", endOffset).add("sourcePartitionOffsetStart", sourcePartitionOffsetStart.toString()).add("sourcePartitionOffsetEnd", sourcePartitionOffsetEnd.toString()).toString(); + return Objects.toStringHelper(this).add("tsRange", tsRange).add("segRange", segRange).add("sourcePartitionOffsetStart", sourcePartitionOffsetStart.toString()).add("sourcePartitionOffsetEnd", sourcePartitionOffsetEnd.toString()).toString(); } public static SourcePartition getCopyOf(SourcePartition origin) { SourcePartition copy = new SourcePartition(); - copy.setStartDate(origin.getStartDate()); - copy.setEndDate(origin.getEndDate()); - copy.setStartOffset(origin.getStartOffset()); - copy.setEndOffset(origin.getEndOffset()); + copy.setTSRange(origin.getTSRange()); + copy.setSegRange(origin.getSegRange()); if (origin.getSourcePartitionOffsetStart() != null) { copy.setSourcePartitionOffsetStart(new HashMap<>(origin.getSourcePartitionOffsetStart())); } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java index 907e0e5..19d6f2f 100644 --- a/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java @@ -20,6 +20,7 @@ package org.apache.kylin.metadata.model; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -51,10 +52,9 @@ public class DefaultPartitionConditionBuilderTest extends LocalFileMetadataTestC partitionDesc.setPartitionDateColumnRef(col); partitionDesc.setPartitionDateColumn(col.getCanonicalName()); partitionDesc.setPartitionDateFormat("yyyy-MM-dd"); - String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, - DateFormat.stringToMillis("2016-02-22"), DateFormat.stringToMillis("2016-02-23")); - Assert.assertEquals("UNKNOWN_ALIAS.DATE_COLUMN >= '2016-02-22' AND UNKNOWN_ALIAS.DATE_COLUMN < '2016-02-23'", - condition); + TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22"), DateFormat.stringToMillis("2016-02-23")); + String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, range); + Assert.assertEquals("UNKNOWN_ALIAS.DATE_COLUMN >= '2016-02-22' AND UNKNOWN_ALIAS.DATE_COLUMN < '2016-02-23'", condition); } @Test @@ -64,8 +64,8 @@ public class DefaultPartitionConditionBuilderTest extends LocalFileMetadataTestC partitionDesc.setPartitionTimeColumnRef(col); partitionDesc.setPartitionTimeColumn(col.getCanonicalName()); partitionDesc.setPartitionTimeFormat("HH"); - String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, - DateFormat.stringToMillis("2016-02-22 00:00:00"), DateFormat.stringToMillis("2016-02-23 01:00:00")); + TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22 00:00:00"), DateFormat.stringToMillis("2016-02-23 01:00:00")); + String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, range); Assert.assertEquals("UNKNOWN_ALIAS.HOUR_COLUMN >= '00' AND UNKNOWN_ALIAS.HOUR_COLUMN < '01'", condition); } @@ -80,11 +80,9 @@ public class DefaultPartitionConditionBuilderTest extends LocalFileMetadataTestC partitionDesc.setPartitionTimeColumnRef(col2); partitionDesc.setPartitionTimeColumn(col2.getCanonicalName()); partitionDesc.setPartitionTimeFormat("H"); - String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, - DateFormat.stringToMillis("2016-02-22 00:00:00"), DateFormat.stringToMillis("2016-02-23 01:00:00")); - Assert.assertEquals( - "((UNKNOWN_ALIAS.DATE_COLUMN = '2016-02-22' AND UNKNOWN_ALIAS.HOUR_COLUMN >= '0') OR (UNKNOWN_ALIAS.DATE_COLUMN > '2016-02-22')) AND ((UNKNOWN_ALIAS.DATE_COLUMN = '2016-02-23' AND UNKNOWN_ALIAS.HOUR_COLUMN < '1') OR (UNKNOWN_ALIAS.DATE_COLUMN < '2016-02-23'))", - condition); + TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22 00:00:00"), DateFormat.stringToMillis("2016-02-23 01:00:00")); + String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, range); + Assert.assertEquals("((UNKNOWN_ALIAS.DATE_COLUMN = '2016-02-22' AND UNKNOWN_ALIAS.HOUR_COLUMN >= '0') OR (UNKNOWN_ALIAS.DATE_COLUMN > '2016-02-22')) AND ((UNKNOWN_ALIAS.DATE_COLUMN = '2016-02-23' AND UNKNOWN_ALIAS.HOUR_COLUMN < '1') OR (UNKNOWN_ALIAS.DATE_COLUMN < '2016-02-23'))", condition); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java index bfddb1f..5db3611 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java @@ -268,6 +268,6 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> { } public boolean hitSegment() { - return cubeSeg.getDateRangeStart() <= getPartitionColumnEndDate() && cubeSeg.getDateRangeEnd() >= getPartitionColumnStartDate(); + return cubeSeg.getTSRange().start.v <= getPartitionColumnEndDate() && cubeSeg.getTSRange().end.v >= getPartitionColumnStartDate(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index 2efd718..dd221f1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -36,6 +36,7 @@ import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +68,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { segment.setInputRecordsSize(sourceSizeBytes); try { - if (segment.isSourceOffsetsOn()) { + if (segment.isOffsetCube()) { updateTimeRange(segment); } @@ -108,8 +109,8 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { IOUtils.closeQuietly(isr); IOUtils.closeQuietly(bufferedReader); } + logger.info("updateTimeRange step. minValue:" + minValue + " maxValue:" + maxValue); - segment.setDateRangeStart(minValue); - segment.setDateRangeEnd(maxValue); + segment.setTSRange(new TSRange(minValue, maxValue)); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java index 04af4fe..a1815e2 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java @@ -41,6 +41,7 @@ import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.dict.IterableDictionaryValueEnumerator; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.source.IReadableTable.TableSignature; @@ -158,7 +159,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase { // String cubeName = "test_kylin_cube_without_slr_left_join_ready_2_segments"; - CubeSegment newSeg = cubeManager.mergeSegments(cube, 0L, Long.MAX_VALUE, 0, 0, false); + CubeSegment newSeg = cubeManager.mergeSegments(cube, new TSRange(0L, Long.MAX_VALUE), null, false); // String segmentName = newSeg.getName(); final Dictionary<String> dictionary = cubeManager.getDictionary(newSeg, lfn); http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index 4ff303d..42fc124 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -55,10 +55,8 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.rest.job.StorageCleanupJob; -import org.apache.kylin.source.ISource; -import org.apache.kylin.source.SourceFactory; -import org.apache.kylin.source.SourcePartition; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator; import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; @@ -243,7 +241,6 @@ public class BuildCubeWithEngine { this.countDownLatch = countDownLatch; } - @SuppressWarnings("unchecked") @Override public Boolean call() throws Exception { try { @@ -259,12 +256,10 @@ public class BuildCubeWithEngine { } } - @SuppressWarnings("unused") protected boolean testTableExt() throws Exception { return true; } - @SuppressWarnings("unused") protected boolean testModel() throws Exception { return true; } @@ -322,6 +317,7 @@ public class BuildCubeWithEngine { return false; } + @SuppressWarnings("unused") private void updateCubeEngineType(String cubeName) throws IOException { CubeDesc cubeDesc = cubeDescManager.getCubeDesc(cubeName); if (cubeDesc.getEngineType() != engineType) { @@ -339,7 +335,7 @@ public class BuildCubeWithEngine { } private Boolean mergeSegment(String cubeName, long startDate, long endDate) throws Exception { - CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), startDate, endDate, 0, 0, true); + CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), new TSRange(startDate, endDate), null, true); DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST"); jobService.addJob(job); ExecutableState state = waitForJob(job.getId()); @@ -348,9 +344,7 @@ public class BuildCubeWithEngine { private Boolean buildSegment(String cubeName, long startDate, long endDate) throws Exception { CubeInstance cubeInstance = cubeManager.getCube(cubeName); - ISource source = SourceFactory.getSource(cubeInstance); - SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance, new SourcePartition(0, endDate, 0, 0, null, null)); - CubeSegment segment = cubeManager.appendSegment(cubeInstance, partition.getStartDate(), partition.getEndDate()); + CubeSegment segment = cubeManager.appendSegment(cubeInstance, new TSRange(0L, endDate)); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); ExecutableState state = waitForJob(job.getId()); http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 9c80413..239b4af 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -57,9 +57,11 @@ import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; import org.apache.kylin.job.streaming.Kafka10DataLoader; +import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.metadata.streaming.StreamingManager; +import org.apache.kylin.rest.job.StorageCleanupJob; import org.apache.kylin.source.ISource; import org.apache.kylin.source.SourceFactory; import org.apache.kylin.source.SourcePartition; @@ -68,7 +70,6 @@ import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; import org.apache.kylin.storage.hbase.util.ZookeeperUtil; -import org.apache.kylin.rest.job.StorageCleanupJob; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -245,9 +246,9 @@ public class BuildCubeWithStream { Assert.assertTrue(segments.size() == succeedBuild); if (fastBuildMode == false) { - long endOffset = segments.get(segments.size() - 1).getSourceOffsetEnd(); + long endOffset = (Long) segments.get(segments.size() - 1).getSegRange().end.v; //merge - ExecutableState result = mergeSegment(cubeName, 0, endOffset); + ExecutableState result = mergeSegment(cubeName, new SegmentRange(0L, endOffset)); Assert.assertTrue(result == ExecutableState.SUCCEED); segments = cubeManager.getCube(cubeName).getSegments(); @@ -255,7 +256,7 @@ public class BuildCubeWithStream { CubeSegment toRefreshSeg = segments.get(0); - refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd()); + refreshSegment(cubeName, toRefreshSeg.getSegRange()); segments = cubeManager.getCube(cubeName).getSegments(); Assert.assertTrue(segments.size() == 1); } @@ -263,16 +264,16 @@ public class BuildCubeWithStream { logger.info("Build is done"); } - private ExecutableState mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception { - CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false); + private ExecutableState mergeSegment(String cubeName, SegmentRange segRange) throws Exception { + CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), null, segRange, false); DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST"); jobService.addJob(job); waitForJob(job.getId()); return job.getStatus(); } - private String refreshSegment(String cubeName, long startOffset, long endOffset) throws Exception { - CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); + private String refreshSegment(String cubeName, SegmentRange segRange) throws Exception { + CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), null, segRange); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); waitForJob(job.getId()); @@ -282,7 +283,7 @@ public class BuildCubeWithStream { protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { CubeInstance cubeInstance = cubeManager.getCube(cubeName); ISource source = SourceFactory.getSource(cubeInstance); - SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance, new SourcePartition(0, 0, startOffset, endOffset, null, null)); + SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance, new SourcePartition(null, new SegmentRange(startOffset, endOffset), null, null)); CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), partition); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/query/.settings/org.eclipse.core.resources.prefs ---------------------------------------------------------------------- diff --git a/query/.settings/org.eclipse.core.resources.prefs b/query/.settings/org.eclipse.core.resources.prefs index 29abf99..839d647 100644 --- a/query/.settings/org.eclipse.core.resources.prefs +++ b/query/.settings/org.eclipse.core.resources.prefs @@ -2,5 +2,4 @@ eclipse.preferences.version=1 encoding//src/main/java=UTF-8 encoding//src/main/resources=UTF-8 encoding//src/test/java=UTF-8 -encoding//src/test/resources=UTF-8 encoding/<project>=UTF-8 http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 4244cf3..14014fc 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -41,6 +41,8 @@ import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISourceAware; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.rest.exception.BadRequestException; @@ -270,8 +272,8 @@ public class CubeController extends BasicController { @RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT }, produces = { "application/json" }) @ResponseBody public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) { - return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, null, null, req.getBuildType(), - req.isForce() || req.isForceMergeEmptySegment()); + return buildInternal(cubeName, new TSRange(req.getStartTime(), req.getEndTime()), null, null, null, + req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment()); } /** Build/Rebuild a cube segment by source offset */ @@ -297,14 +299,14 @@ public class CubeController extends BasicController { @RequestMapping(value = "/{cubeName}/rebuild2", method = { RequestMethod.PUT }, produces = { "application/json" }) @ResponseBody public JobInstance rebuild2(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) { - return buildInternal(cubeName, 0, 0, req.getSourceOffsetStart(), req.getSourceOffsetEnd(), + return buildInternal(cubeName, null, new SegmentRange(req.getSourceOffsetStart(), req.getSourceOffsetEnd()), req.getSourcePartitionOffsetStart(), req.getSourcePartitionOffsetEnd(), req.getBuildType(), req.isForce()); } - private JobInstance buildInternal(String cubeName, long startTime, long endTime, // - long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, - Map<Integer, Long> sourcePartitionOffsetEnd, String buildType, boolean force) { + private JobInstance buildInternal(String cubeName, TSRange tsRange, SegmentRange segRange, // + Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, + String buildType, boolean force) { try { String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); CubeInstance cube = jobService.getCubeManager().getCube(cubeName); @@ -312,9 +314,8 @@ public class CubeController extends BasicController { if (cube == null) { throw new InternalErrorException("Cannot find cube " + cubeName); } - return jobService.submitJob(cube, startTime, endTime, startOffset, endOffset, // - sourcePartitionOffsetStart, sourcePartitionOffsetEnd, CubeBuildTypeEnum.valueOf(buildType), force, - submitter); + return jobService.submitJob(cube, tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd, + CubeBuildTypeEnum.valueOf(buildType), force, submitter); } catch (Throwable e) { logger.error(e.getLocalizedMessage(), e); throw new InternalErrorException(e.getLocalizedMessage(), e); @@ -581,14 +582,14 @@ public class CubeController extends BasicController { } hr.setTableName(tableName); - hr.setDateRangeStart(segment.getDateRangeStart()); - hr.setDateRangeEnd(segment.getDateRangeEnd()); + hr.setDateRangeStart(segment.getTSRange().start.v); + hr.setDateRangeEnd(segment.getTSRange().end.v); hr.setSegmentName(segment.getName()); hr.setSegmentStatus(segment.getStatus().toString()); hr.setSourceCount(segment.getInputRecords()); - if (segment.isSourceOffsetsOn()) { - hr.setSourceOffsetStart(segment.getSourceOffsetStart()); - hr.setSourceOffsetEnd(segment.getSourceOffsetEnd()); + if (segment.isOffsetCube()) { + hr.setSourceOffsetStart((Long) segment.getSegRange().start.v); + hr.setSourceOffsetEnd((Long) segment.getSegRange().end.v); } hbase.add(hr); } @@ -627,14 +628,13 @@ public class CubeController extends BasicController { return jobs; } - boolean isOffsetOn = holes.get(0).isSourceOffsetsOn(); for (CubeSegment hole : holes) { - if (isOffsetOn == true) { + if (hole.isOffsetCube()) { JobBuildRequest2 request = new JobBuildRequest2(); request.setBuildType(CubeBuildTypeEnum.BUILD.toString()); - request.setSourceOffsetStart(hole.getSourceOffsetStart()); + request.setSourceOffsetStart((Long) hole.getSegRange().start.v); + request.setSourceOffsetEnd((Long) hole.getSegRange().end.v); request.setSourcePartitionOffsetStart(hole.getSourcePartitionOffsetStart()); - request.setSourceOffsetEnd(hole.getSourceOffsetEnd()); request.setSourcePartitionOffsetEnd(hole.getSourcePartitionOffsetEnd()); try { JobInstance job = build2(cubeName, request); @@ -647,8 +647,8 @@ public class CubeController extends BasicController { } else { JobBuildRequest request = new JobBuildRequest(); request.setBuildType(CubeBuildTypeEnum.BUILD.toString()); - request.setStartTime(hole.getDateRangeStart()); - request.setEndTime(hole.getDateRangeEnd()); + request.setStartTime(hole.getTSRange().start.v); + request.setEndTime(hole.getTSRange().end.v); try { JobInstance job = build(cubeName, request); http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java b/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java index f8d5f83..292b633 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java @@ -37,6 +37,8 @@ import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.metadata.draft.Draft; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISourceAware; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.rest.controller.BasicController; import org.apache.kylin.rest.exception.BadRequestException; @@ -355,8 +357,8 @@ public class CubeControllerV2 extends BasicController { throws IOException { return new EnvelopeResponse(ResponseCode.CODE_SUCCESS, - buildInternalV2(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, null, null, req.getBuildType(), - req.isForce() || req.isForceMergeEmptySegment()), + buildInternalV2(cubeName, new TSRange(req.getStartTime(), req.getEndTime()), null, null, null, + req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment()), ""); } @@ -392,15 +394,15 @@ public class CubeControllerV2 extends BasicController { throws IOException { return new EnvelopeResponse(ResponseCode.CODE_SUCCESS, - buildInternalV2(cubeName, 0, 0, req.getSourceOffsetStart(), req.getSourceOffsetEnd(), + buildInternalV2(cubeName, null, new SegmentRange(req.getSourceOffsetStart(), req.getSourceOffsetEnd()), req.getSourcePartitionOffsetStart(), req.getSourcePartitionOffsetEnd(), req.getBuildType(), req.isForce()), ""); } - private JobInstance buildInternalV2(String cubeName, long startTime, long endTime, // - long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, - Map<Integer, Long> sourcePartitionOffsetEnd, String buildType, boolean force) throws IOException { + private JobInstance buildInternalV2(String cubeName, TSRange tsRange, SegmentRange segRange, // + Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, + String buildType, boolean force) throws IOException { Message msg = MsgPicker.getMsg(); String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); @@ -412,9 +414,8 @@ public class CubeControllerV2 extends BasicController { if (cube.getDescriptor().isDraft()) { throw new BadRequestException(msg.getBUILD_DRAFT_CUBE()); } - return jobService.submitJob(cube, startTime, endTime, startOffset, endOffset, // - sourcePartitionOffsetStart, sourcePartitionOffsetEnd, CubeBuildTypeEnum.valueOf(buildType), force, - submitter); + return jobService.submitJob(cube, tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd, + CubeBuildTypeEnum.valueOf(buildType), force, submitter); } @RequestMapping(value = "/{cubeName}/purge", method = { RequestMethod.PUT }, produces = { @@ -470,15 +471,15 @@ public class CubeControllerV2 extends BasicController { } hr.setTableName(tableName); - hr.setDateRangeStart(segment.getDateRangeStart()); - hr.setDateRangeEnd(segment.getDateRangeEnd()); + hr.setDateRangeStart(segment.getTSRange().start.v); + hr.setDateRangeEnd(segment.getTSRange().end.v); hr.setSegmentName(segment.getName()); hr.setSegmentUUID(segment.getUuid()); hr.setSegmentStatus(segment.getStatus().toString()); hr.setSourceCount(segment.getInputRecords()); - if (segment.isSourceOffsetsOn()) { - hr.setSourceOffsetStart(segment.getSourceOffsetStart()); - hr.setSourceOffsetEnd(segment.getSourceOffsetEnd()); + if (segment.isOffsetCube()) { + hr.setSourceOffsetStart((Long) segment.getSegRange().start.v); + hr.setSourceOffsetEnd((Long) segment.getSegRange().end.v); } hbase.add(hr); } @@ -524,14 +525,13 @@ public class CubeControllerV2 extends BasicController { return new EnvelopeResponse(ResponseCode.CODE_SUCCESS, jobs, ""); } - boolean isOffsetOn = holes.get(0).isSourceOffsetsOn(); for (CubeSegment hole : holes) { - if (isOffsetOn == true) { + if (hole.isOffsetCube()) { JobBuildRequest2 request = new JobBuildRequest2(); request.setBuildType(CubeBuildTypeEnum.BUILD.toString()); - request.setSourceOffsetStart(hole.getSourceOffsetStart()); + request.setSourceOffsetStart((Long) hole.getSegRange().start.v); + request.setSourceOffsetEnd((Long) hole.getSegRange().end.v); request.setSourcePartitionOffsetStart(hole.getSourcePartitionOffsetStart()); - request.setSourceOffsetEnd(hole.getSourceOffsetEnd()); request.setSourcePartitionOffsetEnd(hole.getSourcePartitionOffsetEnd()); try { JobInstance job = (JobInstance) build2V2(cubeName, request).data; @@ -544,8 +544,8 @@ public class CubeControllerV2 extends BasicController { } else { JobBuildRequest request = new JobBuildRequest(); request.setBuildType(CubeBuildTypeEnum.BUILD.toString()); - request.setStartTime(hole.getDateRangeStart()); - request.setEndTime(hole.getDateRangeEnd()); + request.setStartTime(hole.getTSRange().start.v); + request.setEndTime(hole.getTSRange().end.v); try { JobInstance job = (JobInstance) buildV2(cubeName, request).data; http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server-base/src/main/java/org/apache/kylin/rest/job/HybridCubeCLI.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/HybridCubeCLI.java b/server-base/src/main/java/org/apache/kylin/rest/job/HybridCubeCLI.java index 64fcbc8..875e978 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/job/HybridCubeCLI.java +++ b/server-base/src/main/java/org/apache/kylin/rest/job/HybridCubeCLI.java @@ -18,6 +18,10 @@ package org.apache.kylin.rest.job; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; @@ -39,10 +43,6 @@ import org.apache.kylin.storage.hybrid.HybridManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - /** * 1. Create new HybridCube * bin/kylin.sh org.apache.kylin.tool.HybridCubeCLI -action create -name hybrid_name -project project_name -model model_name -cubes cube1,cube2 @@ -194,12 +194,12 @@ public class HybridCubeCLI extends AbstractApplication { if (segment == null) continue; if (lastOffset == -1) { - lastOffset = segment.getSourceOffsetEnd(); + lastOffset = (Long) segment.getSegRange().end.v; } else { - if (lastOffset > segment.getSourceOffsetStart()) { - throw new RuntimeException("Segments has overlap, could not hybrid. Last Segment End: " + lastOffset + ", Next Segment Start: " + segment.getSourceOffsetStart()); + if (lastOffset > (Long) segment.getSegRange().start.v) { + throw new RuntimeException("Segments has overlap, could not hybrid. Last Segment End: " + lastOffset + ", Next Segment Start: " + segment.getSegRange().start.v); } - lastOffset = segment.getSourceOffsetEnd(); + lastOffset = (Long) segment.getSegRange().end.v; } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index fe8fb6c..e79bab9 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -28,7 +28,6 @@ import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.RootPersistentEntity; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -43,6 +42,7 @@ import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.draft.Draft; import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; @@ -537,11 +537,11 @@ public class CubeService extends BasicService implements InitializingBean { return; List<CubeSegment> toRemoveSegs = Lists.newArrayList(); - long tail = readySegs.get(readySegs.size() - 1).getDateRangeEnd(); + long tail = readySegs.get(readySegs.size() - 1).getTSRange().end.v; long head = tail - desc.getRetentionRange(); for (CubeSegment seg : readySegs) { - if (seg.getDateRangeEnd() > 0) { // for streaming cube its initial value is 0 - if (seg.getDateRangeEnd() <= head) { + if (seg.getTSRange().end.v > 0) { // for streaming cube its initial value is 0 + if (seg.getTSRange().end.v <= head) { toRemoveSegs.add(seg); } } @@ -567,10 +567,9 @@ public class CubeService extends BasicService implements InitializingBean { synchronized (CubeService.class) { try { cube = getCubeManager().getCube(cubeName); - Pair<Long, Long> offsets = getCubeManager().autoMergeCubeSegments(cube); + SegmentRange offsets = cube.autoMergeCubeSegments(); if (offsets != null) { - CubeSegment newSeg = getCubeManager().mergeSegments(cube, 0, 0, offsets.getFirst(), - offsets.getSecond(), true); + CubeSegment newSeg = getCubeManager().mergeSegments(cube, null, offsets, true); logger.debug("Will submit merge job on " + newSeg); DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(newSeg, "SYSTEM"); getExecutableManager().addJob(job); http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 8257d4c..d2180a7 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -18,12 +18,18 @@ package org.apache.kylin.rest.service; -import com.google.common.base.Function; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.Calendar; +import java.util.Collections; +import java.util.Date; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; + +import javax.annotation.Nullable; + import org.apache.commons.lang3.StringUtils; import org.apache.directory.api.util.Strings; import org.apache.kylin.common.KylinConfig; @@ -48,6 +54,8 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; import org.apache.kylin.job.lock.JobLock; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.rest.exception.BadRequestException; @@ -65,16 +73,12 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.stereotype.Component; -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.Calendar; -import java.util.Collections; -import java.util.Date; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** * @author ysong1 @@ -200,11 +204,11 @@ public class JobService extends BasicService implements InitializingBean { } } - public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, // + public JobInstance submitJob(CubeInstance cube, TSRange tsRange, SegmentRange segRange, // Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException { aclEvaluate.checkProjectOperationPermission(cube); - JobInstance jobInstance = submitJobInternal(cube, startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, + JobInstance jobInstance = submitJobInternal(cube, tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd, buildType, force, submitter); accessService.init(jobInstance, null); @@ -213,9 +217,8 @@ public class JobService extends BasicService implements InitializingBean { return jobInstance; } - public JobInstance submitJobInternal(CubeInstance cube, long startDate, long endDate, long startOffset, - long endOffset, // - Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, + public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, SegmentRange segRange, // + Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, // CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException { Message msg = MsgPicker.getMsg(); @@ -230,16 +233,15 @@ public class JobService extends BasicService implements InitializingBean { try { if (buildType == CubeBuildTypeEnum.BUILD) { ISource source = SourceFactory.getSource(cube); - SourcePartition sourcePartition = new SourcePartition(startDate, endDate, startOffset, endOffset, - sourcePartitionOffsetStart, sourcePartitionOffsetEnd); - sourcePartition = source.enrichSourcePartitionBeforeBuild(cube, sourcePartition); - newSeg = getCubeManager().appendSegment(cube, sourcePartition); + SourcePartition src = new SourcePartition(tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd); + src = source.enrichSourcePartitionBeforeBuild(cube, src); + newSeg = getCubeManager().appendSegment(cube, src); job = EngineFactory.createBatchCubingJob(newSeg, submitter); } else if (buildType == CubeBuildTypeEnum.MERGE) { - newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force); + newSeg = getCubeManager().mergeSegments(cube, tsRange, segRange, force); job = EngineFactory.createBatchMergeJob(newSeg, submitter); } else if (buildType == CubeBuildTypeEnum.REFRESH) { - newSeg = getCubeManager().refreshSegment(cube, startDate, endDate, startOffset, endOffset); + newSeg = getCubeManager().refreshSegment(cube, tsRange, segRange); job = EngineFactory.createBatchCubingJob(newSeg, submitter); } else { throw new BadRequestException(String.format(msg.getINVALID_BUILD_TYPE(), buildType)); @@ -335,7 +337,7 @@ public class JobService extends BasicService implements InitializingBean { final String segmentIds = job.getRelatedSegment(); for (String segmentId : StringUtils.split(segmentIds)) { final CubeSegment segment = cubeInstance.getSegmentById(segmentId); - if (segment != null && (segment.getStatus() == SegmentStatusEnum.NEW || segment.getDateRangeEnd() == 0)) { + if (segment != null && (segment.getStatus() == SegmentStatusEnum.NEW || segment.getTSRange().end.v == 0)) { // Remove this segments CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); cubeBuilder.setToRemoveSegs(segment); http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java index 42cf085..61c8bff 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java @@ -42,6 +42,7 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.JoinsTree; import org.apache.kylin.metadata.model.ModelDimensionDesc; +import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.project.ProjectInstance; @@ -257,13 +258,8 @@ public class ModelService extends BasicService { } @Override - public long getSourceOffsetStart() { - return 0; - } - - @Override - public long getSourceOffsetEnd() { - return 0; + public SegmentRange getSegRange() { + return null; } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java index e67c238..2c91d90 100644 --- a/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java +++ b/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.request.CubeRequest; import org.apache.kylin.rest.service.CubeService; @@ -35,10 +36,10 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; -import org.springframework.beans.factory.annotation.Qualifier; /** * @author xduo @@ -179,10 +180,10 @@ public class CubeControllerTest extends ServiceTestBase { CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); List<CubeSegment> segments = cube.getSegments(); - final long dateEnd = segments.get(segments.size() -1).getDateRangeEnd(); + final long dateEnd = segments.get(segments.size() -1).getTSRange().end.v; final long ONEDAY = 24 * 60 * 60000; - cubeService.getCubeManager().appendSegment(cube, dateEnd + ONEDAY, dateEnd + ONEDAY * 2); + cubeService.getCubeManager().appendSegment(cube, new TSRange(dateEnd + ONEDAY, dateEnd + ONEDAY * 2)); List<CubeSegment> holes = cubeController.getHoles(cubeName); @@ -190,8 +191,7 @@ public class CubeControllerTest extends ServiceTestBase { CubeSegment hole = holes.get(0); - Assert.assertTrue(hole.getDateRangeStart() == dateEnd && hole.getDateRangeEnd() == (dateEnd + ONEDAY)); - + Assert.assertTrue(hole.getTSRange().equals(new TSRange(dateEnd, dateEnd + ONEDAY))); } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java index d059883..704e45d 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java @@ -40,6 +40,7 @@ import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.JoinTableDesc; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; @@ -238,7 +239,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase { CubeInstance cube = cubeManager.getCube(cubeName); assertEquals(0, cube.getSegments().size()); assertEquals(0, cubeManagerB.getCube(cubeName).getSegments().size()); - CubeSegment segment = cubeManager.appendSegment(cube, 0, 1000); + CubeSegment segment = cubeManager.appendSegment(cube, new TSRange(0L, 1000L)); //one for cube update assertEquals(1, broadcaster.getCounterAndClear()); waitForCounterAndClear(1); http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java index 6e45406..2fa2fdc 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java @@ -64,8 +64,7 @@ public class HiveSource implements ISource { @Override public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) { SourcePartition result = SourcePartition.getCopyOf(srcPartition); - result.setStartOffset(0); - result.setEndOffset(0); + result.setSegRange(null); return result; } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java index cd66837..5e06f90 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java @@ -53,8 +53,7 @@ public class JdbcSource implements ISource { @Override public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) { SourcePartition result = SourcePartition.getCopyOf(srcPartition); - result.setStartOffset(0); - result.setEndOffset(0); + result.setSegRange(null); return result; } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 161099e..fe07b6f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -29,6 +29,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.metadata.model.IBuildable; +import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.streaming.StreamingConfig; @@ -68,8 +69,9 @@ public class KafkaSource implements ISource { public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) { checkSourceOffsets(srcPartition); final SourcePartition result = SourcePartition.getCopyOf(srcPartition); + final SegmentRange range = result.getSegRange(); final CubeInstance cube = (CubeInstance) buildable; - if (result.getStartOffset() == 0) { + if (range == null || range.start.v.equals(0L)) { final CubeSegment last = cube.getLastSegment(); if (last != null) { logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: " + last.getSourcePartitionOffsetEnd()); @@ -101,7 +103,7 @@ public class KafkaSource implements ISource { } } - if (result.getEndOffset() == Long.MAX_VALUE) { + if (range == null || range.end.v.equals(Long.MAX_VALUE)) { logger.debug("Seek end offsets from topic"); Map<Integer, Long> latestOffsets = KafkaClient.getLatestOffsets(cube); logger.debug("The end offsets are " + latestOffsets); @@ -134,18 +136,20 @@ public class KafkaSource implements ISource { throw new IllegalArgumentException("No new message comes, startOffset = endOffset:" + totalStartOffset); } - result.setStartOffset(totalStartOffset); - result.setEndOffset(totalEndOffset); + result.setSegRange(new SegmentRange(totalStartOffset, totalEndOffset)); logger.debug("parsePartitionBeforeBuild() return: " + result); return result; } - private void checkSourceOffsets(SourcePartition srcPartition) { - long startOffset = srcPartition.getStartOffset(); - long endOffset = srcPartition.getEndOffset(); - final Map<Integer, Long> sourcePartitionOffsetStart = srcPartition.getSourcePartitionOffsetStart(); - final Map<Integer, Long> sourcePartitionOffsetEnd = srcPartition.getSourcePartitionOffsetEnd(); + private void checkSourceOffsets(SourcePartition src) { + if (src.getSegRange() == null) + return; + + long startOffset = (Long) src.getSegRange().start.v; + long endOffset = (Long) src.getSegRange().end.v; + final Map<Integer, Long> sourcePartitionOffsetStart = src.getSourcePartitionOffsetStart(); + final Map<Integer, Long> sourcePartitionOffsetEnd = src.getSourcePartitionOffsetEnd(); if (endOffset <= 0 || startOffset >= endOffset) { throw new IllegalArgumentException("'startOffset' need be smaller than 'endOffset'"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java index 914fca2..e3a7586 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java @@ -19,9 +19,7 @@ package org.apache.kylin.source.kafka.job; import java.io.IOException; import java.util.Collections; -import java.util.List; -import com.google.common.base.Preconditions; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -31,9 +29,14 @@ import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; +import org.apache.kylin.metadata.model.Segments; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** */ public class MergeOffsetStep extends AbstractExecutable { @@ -52,7 +55,7 @@ public class MergeOffsetStep extends AbstractExecutable { final CubeSegment segment = cube.getSegmentById(segmentId); Preconditions.checkNotNull(segment, "Cube segment '" + segmentId + "' not found."); - List<CubeSegment> mergingSegs = cube.getMergingSegments(segment); + Segments<CubeSegment> mergingSegs = cube.getMergingSegments(segment); Preconditions.checkArgument(mergingSegs.size() > 0, "Merging segment not exist."); @@ -60,16 +63,11 @@ public class MergeOffsetStep extends AbstractExecutable { final CubeSegment first = mergingSegs.get(0); final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1); - segment.setSourceOffsetStart(first.getSourceOffsetStart()); + segment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end)); segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart()); - segment.setSourceOffsetEnd(last.getSourceOffsetEnd()); segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd()); - long dateRangeStart = CubeManager.minDateRangeStart(mergingSegs); - long dateRangeEnd = CubeManager.maxDateRangeEnd(mergingSegs); - - segment.setDateRangeStart(dateRangeStart); - segment.setDateRangeEnd(dateRangeEnd); + segment.setTSRange(new TSRange(mergingSegs.getTSStart(), mergingSegs.getTSEnd())); CubeUpdate cubeBuilder = new CubeUpdate(cube); cubeBuilder.setToUpdateSegs(segment); http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java index 1cdb2f8..ff8c487 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java @@ -156,17 +156,17 @@ public class ExtendCubeToHybridCLI { CubeSegment currentSeg = null; while (segmentIterator.hasNext()) { currentSeg = segmentIterator.next(); - if (partitionDateStr != null && (currentSeg.getDateRangeStart() >= partitionDate || currentSeg.getDateRangeEnd() > partitionDate)) { + if (partitionDateStr != null && (currentSeg.getTSRange().start.v >= partitionDate || currentSeg.getTSRange().end.v > partitionDate)) { segmentIterator.remove(); logger.info("CubeSegment[" + currentSeg + "] was removed."); } } - if (partitionDateStr != null && partitionDate != currentSeg.getDateRangeEnd()) { + if (partitionDateStr != null && partitionDate != currentSeg.getTSRange().end.v) { logger.error("PartitionDate must be end date of one segment."); return; } if (currentSeg != null && partitionDateStr == null) - partitionDate = currentSeg.getDateRangeEnd(); + partitionDate = currentSeg.getTSRange().end.v; cubeManager.createCube(newCubeInstance, projectName, owner); logger.info("CubeInstance was saved at: " + newCubeInstance.getResourcePath()); http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java index f52fc3e..d048dd7 100644 --- a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java @@ -152,17 +152,17 @@ public class ExtendCubeToHybridCLI { CubeSegment currentSeg = null; while (segmentIterator.hasNext()) { currentSeg = segmentIterator.next(); - if (partitionDateStr != null && (currentSeg.getDateRangeStart() >= partitionDate || currentSeg.getDateRangeEnd() > partitionDate)) { + if (partitionDateStr != null && (currentSeg.getTSRange().start.v >= partitionDate || currentSeg.getTSRange().end.v > partitionDate)) { segmentIterator.remove(); logger.info("CubeSegment[" + currentSeg + "] was removed."); } } - if (currentSeg != null && partitionDateStr != null && partitionDate != currentSeg.getDateRangeEnd()) { + if (currentSeg != null && partitionDateStr != null && partitionDate != currentSeg.getTSRange().end.v) { logger.error("PartitionDate must be end date of one segment."); return; } if (currentSeg != null && partitionDateStr == null) - partitionDate = currentSeg.getDateRangeEnd(); + partitionDate = currentSeg.getTSRange().end.v; cubeManager.createCube(newCubeInstance, projectName, owner); logger.info("CubeInstance was saved at: " + newCubeInstance.getResourcePath());