http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/SegmentRange.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/SegmentRange.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/SegmentRange.java
new file mode 100644
index 0000000..e52da09
--- /dev/null
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/SegmentRange.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.model;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * SegmentRange and TSRange seem similar but are different concepts.
+ * 
+ * - SegmentRange defines the range of a segment.
+ * - TSRange is the time series range of the segment data.
+ * - When segment range is defined by time, the two can be the same, in that 
case TSRange is a kind of SegmentRange.
+ * - Duration segment creation (build/refresh/merge), a new segment is defined 
by either one of the two, not both.
+ * - And the choice must be consistent across all following segment creation.
+ */
+@SuppressWarnings("serial")
+public class SegmentRange<T extends Comparable> implements Serializable {
+
+    public final Endpoint<T> start;
+    public final Endpoint<T> end;
+
+    public SegmentRange(Endpoint start, Endpoint end) {
+        this.start = start;
+        this.end = end;
+        checkState();
+    }
+
+    public SegmentRange(T start, T end) {
+        if (start != null && end != null && start.getClass() != end.getClass())
+            throw new IllegalArgumentException();
+
+        this.start = new Endpoint(start, start == null, false);
+        this.end = new Endpoint(end, false, end == null);
+        checkState();
+    }
+
+    private void checkState() {
+        if (start.compareTo(end) > 0)
+            throw new IllegalStateException();
+    }
+
+    public boolean isInfinite() {
+        return start.isMin && end.isMax;
+    }
+
+    public boolean contains(SegmentRange o) {
+        return this.start.compareTo(o.start) <= 0 && o.end.compareTo(this.end) 
<= 0;
+    }
+
+    public boolean overlaps(SegmentRange o) {
+        return this.start.compareTo(o.end) < 0 && o.start.compareTo(this.end) 
< 0;
+    }
+
+    public boolean connects(SegmentRange o) {
+        return this.end.compareTo(o.start) == 0;
+    }
+
+    public boolean apartBefore(SegmentRange o) {
+        return this.end.compareTo(o.start) < 0;
+    }
+
+    @Override
+    public String toString() {
+        return this.getClass().getSimpleName() + "[" + start + "," + end + ")";
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((end == null) ? 0 : end.hashCode());
+        result = prime * result + ((start == null) ? 0 : start.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        SegmentRange other = (SegmentRange) obj;
+        if (end == null) {
+            if (other.end != null)
+                return false;
+        } else if (!end.equals(other.end))
+            return false;
+        if (start == null) {
+            if (other.start != null)
+                return false;
+        } else if (!start.equals(other.start))
+            return false;
+        return true;
+    }
+
+    // 
============================================================================
+
+    public static class TSRange extends SegmentRange<Long> {
+
+        public TSRange(Long start, Long end) {
+            // [0, Long.MAX_VALUE) is full build (for historic reason)
+            super(new Endpoint(isInfinite(start, end) ? 0 : start, 
isInfinite(start, end), false), //
+                    new Endpoint(isInfinite(start, end) ? Long.MAX_VALUE : 
end, false, isInfinite(start, end)));
+        }
+
+        private static boolean isInfinite(Long start, Long end) {
+            return (start == null || start <= 0) && (end == null || end == 
Long.MAX_VALUE);
+        }
+
+        public long duration() {
+            return end.v - start.v;
+        }
+    }
+
+    // 
============================================================================
+
+    // immutable
+    public static class Endpoint<T extends Comparable> implements 
Comparable<Endpoint>, Serializable {
+
+        public static final Comparator<Endpoint> comparator = 
getComparator(new Comparator() {
+            @Override
+            public int compare(Object o1, Object o2) {
+                return ((Comparable) o1).compareTo(o2);
+            }
+        });
+
+        public static Comparator<Endpoint> getComparator(final Comparator 
valueComparator) {
+            return new Comparator<Endpoint>() {
+                @Override
+                public int compare(Endpoint a, Endpoint b) {
+                    if (a.isMin) {
+                        return b.isMin ? 0 : -1;
+                    } else if (b.isMin) {
+                        return a.isMin ? 0 : 1;
+                    } else if (a.isMax) {
+                        return b.isMax ? 0 : 1;
+                    } else if (b.isMax) {
+                        return a.isMax ? 0 : -1;
+                    } else {
+                        if (a == null || b == null)
+                            throw new IllegalStateException();
+
+                        return valueComparator.compare(a.v, b.v);
+                    }
+                }
+            };
+        }
+
+        public final T v;
+        public final boolean isMin;
+        public final boolean isMax;
+
+        private Endpoint(T v, boolean isMin, boolean isMax) {
+            this.v = v;
+            this.isMin = isMin;
+            this.isMax = isMax;
+        }
+
+        @Override
+        public int compareTo(Endpoint o) {
+            return comparator.compare(this, o);
+        }
+        
+        @Override
+        public String toString() {
+            String s = "" + v;
+            if (isMin)
+                s += "[min]";
+            if (isMax)
+                s += "[max]";
+            return s;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (isMax ? 1231 : 1237);
+            result = prime * result + (isMin ? 1231 : 1237);
+            result = prime * result + ((v == null) ? 0 : v.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            Endpoint other = (Endpoint) obj;
+            if (isMax != other.isMax)
+                return false;
+            if (isMin != other.isMin)
+                return false;
+
+            return comparator.compare(this, other) == 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
index 73a385c..7af1ebc 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
@@ -23,25 +23,42 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
+import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.model.SegmentRange.Endpoint;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 public class Segments<T extends ISegment> extends ArrayList<T> implements 
Serializable{
 
     private static final long serialVersionUID = 1L;
 
     private static final Logger logger = 
LoggerFactory.getLogger(Segments.class);
 
-    public static boolean sourceOffsetContains(ISegment a, ISegment b) {
-        return a.getSourceOffsetStart() <= b.getSourceOffsetStart() && 
b.getSourceOffsetEnd() <= a.getSourceOffsetEnd();
+    public static ISegmentAdvisor newSegmentAdvisor(ISegment seg) {
+        try {
+            Class<? extends ISegmentAdvisor> clz = 
ClassUtil.forName(seg.getConfig().getSegmentAdvisor(), ISegmentAdvisor.class);
+            return clz.getConstructor(ISegment.class).newInstance(seg);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
-
-    public static boolean sourceOffsetOverlaps(ISegment a, ISegment b) {
-        return a.getSourceOffsetStart() < b.getSourceOffsetEnd() && 
b.getSourceOffsetStart() < a.getSourceOffsetEnd();
+    
+    // 
============================================================================
+    
+    public Segments() {
+        super();
     }
 
+    public Segments(Segments<T> copy) {
+        super(copy);
+    }
+    
     public T getFirstSegment() {
         if (this == null || this.size() == 0) {
             return null;
@@ -49,24 +66,24 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
             return this.get(0);
         }
     }
-
-    public long getDateRangeStart() {
+    
+    public long getTSStart() {
         Segments<T> readySegs = getSegments(SegmentStatusEnum.READY);
 
         long startTime = Long.MAX_VALUE;
         for (ISegment seg : readySegs) {
-            startTime = Math.min(startTime, seg.getDateRangeStart());
+            startTime = Math.min(startTime, seg.getTSRange().start.v);
         }
 
         return startTime;
     }
 
-    public long getDateRangeEnd() {
+    public long getTSEnd() {
         Segments<T> readySegs = getSegments(SegmentStatusEnum.READY);
 
         long endTime = Long.MIN_VALUE;
         for (ISegment seg : readySegs) {
-            endTime = Math.max(endTime, seg.getDateRangeEnd());
+            endTime = Math.max(endTime, seg.getTSRange().end.v);
         }
 
         return endTime;
@@ -78,7 +95,7 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
             T seg = this.get(i);
             if (seg.getStatus() != SegmentStatusEnum.READY)
                 continue;
-            if (latest == null || latest.getDateRangeEnd() < 
seg.getDateRangeEnd()) {
+            if (latest == null || latest.getTSRange().end.v < 
seg.getTSRange().end.v) {
                 latest = seg;
             }
         }
@@ -129,7 +146,7 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
         return buildingSegments;
     }
 
-    public Segments getMergingSegments(T mergedSegment) {
+    public Segments<T> getMergingSegments(T mergedSegment) {
         Segments<T> result = new Segments();
         if (mergedSegment == null)
             return result;
@@ -141,9 +158,9 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
             if (seg == mergedSegment)
                 continue;
 
-            if (sourceOffsetContains(mergedSegment, seg)) {
+            if (mergedSegment.getSegRange().contains(seg.getSegRange())) {
                 // make sure no holes
-                if (result.size() > 0 && result.getLast().getSourceOffsetEnd() 
!= seg.getSourceOffsetStart())
+                if (result.size() > 0 && 
!result.getLast().getSegRange().connects(seg.getSegRange()))
                     throw new IllegalStateException("Merging segments must not 
have gaps between " + result.getLast() + " and " + seg);
 
                 result.add(seg);
@@ -152,7 +169,7 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
         return result;
     }
 
-    public Pair<Long, Long> autoMergeCubeSegments(boolean needAutoMerge, 
String cubeName, long[] timeRanges) throws IOException {
+    public SegmentRange autoMergeCubeSegments(boolean needAutoMerge, String 
cubeName, long[] timeRanges) throws IOException {
         if (!needAutoMerge) {
             logger.debug("Cube " + cubeName + " doesn't need auto merge");
             return null;
@@ -171,7 +188,7 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
             for (ISegment building : getBuildingSegments()) {
                 // exclude those under-merging segs
                 for (ISegment ready : readySegs) {
-                    if (ready.getSourceOffsetStart() >= 
building.getSourceOffsetStart() && ready.getSourceOffsetEnd() <= 
building.getSourceOffsetEnd()) {
+                    if (building.getSegRange().contains(ready.getSegRange())) {
                         mergingSegs.add(ready);
                     }
                 }
@@ -188,30 +205,31 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
 
             for (int s = 0; s < readySegs.size(); s++) {
                 ISegment seg = readySegs.get(s);
+                TSRange tsRange = new TSRange(seg.getTSRange().start.v, 
seg.getTSRange().start.v + toMergeRange);
                 Pair<T, T> p = readySegs.getSubList(s, readySegs.size()) //
-                        .findMergeOffsetsByDateRange(seg.getDateRangeStart(), 
seg.getDateRangeStart() + toMergeRange, toMergeRange);
-                if (p != null && p.getSecond().getDateRangeEnd() - 
p.getFirst().getDateRangeStart() >= toMergeRange)
-                    return Pair.newPair(p.getFirst().getSourceOffsetStart(), 
p.getSecond().getSourceOffsetEnd());
+                        .findMergeOffsetsByDateRange(tsRange, toMergeRange);
+                if (p != null && p.getSecond().getTSRange().end.v - 
p.getFirst().getTSRange().start.v >= toMergeRange)
+                    return new 
SegmentRange(p.getFirst().getSegRange().start.v, 
p.getSecond().getSegRange().end.v);
             }
         }
 
         return null;
     }
 
-    public Pair<T, T> findMergeOffsetsByDateRange(long startDate, long 
endDate, long skipSegDateRangeCap) {
+    public Pair<T, T> findMergeOffsetsByDateRange(TSRange tsRange, long 
skipSegDateRangeCap) {
         // must be offset cube
         Segments result = new Segments();
         for (ISegment seg : this) {
 
             // include if date range overlaps
-            if (startDate < seg.getDateRangeEnd() && seg.getDateRangeStart() < 
endDate) {
+            if (tsRange.overlaps(seg.getTSRange())) {
 
                 // reject too big segment
-                if (seg.getDateRangeEnd() - seg.getDateRangeStart() > 
skipSegDateRangeCap)
+                if (seg.getTSRange().duration() > skipSegDateRangeCap)
                     break;
 
                 // reject holes
-                if (result.size() > 0 && result.getLast().getSourceOffsetEnd() 
!= seg.getSourceOffsetStart())
+                if (result.size() > 0 && 
!result.getLast().getSegRange().connects(seg.getSegRange()))
                     break;
 
                 result.add(seg);
@@ -262,11 +280,11 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
                 continue;
             }
 
-            if (is.getSourceOffsetStart() == js.getSourceOffsetStart()) {
+            if (is.getSegRange().start.compareTo(js.getSegRange().start) == 0) 
{
                 // if i, j competes
                 if (isReady(is) && isReady(js) || isNew(is) && isNew(js)) {
                     // if both new or ready, favor the bigger segment
-                    if (is.getSourceOffsetEnd() <= js.getSourceOffsetEnd()) {
+                    if (is.getSegRange().end.compareTo(js.getSegRange().end) 
<= 0) {
                         tobe.remove(i);
                     } else {
                         tobe.remove(j);
@@ -285,7 +303,7 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
             }
 
             // if i, j in sequence
-            if (is.getSourceOffsetEnd() <= js.getSourceOffsetStart()) {
+            if (is.getSegRange().end.compareTo(js.getSegRange().start) <= 0) {
                 i++;
                 j++;
                 continue;
@@ -333,4 +351,100 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
         return result;
     }
 
+    /**
+     * Validates:
+     * - consistent isOffsetCube()
+     * - for all ready segments, sourceOffset MUST have no overlaps, SHOULD 
have no holes
+     * - for all new segments, sourceOffset MUST have no overlaps, MUST 
contain a ready segment if overlaps with it
+     * - for all new segments, sourceOffset SHOULD fit/connect another segments
+     * - dateRange does not matter any more
+     */
+    public void validate() {
+        if (this.isEmpty())
+            return;
+
+        // make a copy, don't modify existing list
+        Segments<T> all = new Segments<T>(this);
+        Collections.sort(all);
+
+        // check consistent isOffsetCube()
+        boolean isOffsetCube = all.get(0).isOffsetCube();
+        for (ISegment seg : all) {
+            seg.validate();
+            if (seg.isOffsetCube() != isOffsetCube)
+                throw new IllegalStateException("Inconsistent isOffsetsOn for 
segment " + seg);
+        }
+
+        List<ISegment> ready = Lists.newArrayListWithCapacity(all.size());
+        List<ISegment> news = Lists.newArrayListWithCapacity(all.size());
+        for (ISegment seg : all) {
+            if (seg.getStatus() == SegmentStatusEnum.READY)
+                ready.add(seg);
+            else
+                news.add(seg);
+        }
+
+        // for all ready segments, sourceOffset MUST have no overlaps, SHOULD 
have no holes
+        ISegment pre = null;
+        for (ISegment seg : ready) {
+            if (pre != null) {
+                if (pre.getSegRange().overlaps(seg.getSegRange()))
+                    throw new IllegalStateException("Segments overlap: " + pre 
+ " and " + seg);
+                if (pre.getSegRange().apartBefore(seg.getSegRange()))
+                    logger.warn("Hole between adjacent READY segments " + pre 
+ " and " + seg);
+            }
+            pre = seg;
+        }
+
+        // for all other segments, sourceOffset MUST have no overlaps, MUST 
contain a ready segment if overlaps with it
+        pre = null;
+        for (ISegment seg : news) {
+            if (pre != null) {
+                if (pre.getSegRange().overlaps(seg.getSegRange()))
+                    throw new IllegalStateException("Segments overlap: " + pre 
+ " and " + seg);
+            }
+            pre = seg;
+
+            for (ISegment aReady : ready) {
+                if (seg.getSegRange().overlaps(aReady.getSegRange()) && 
!seg.getSegRange().contains(aReady.getSegRange()))
+                    throw new IllegalStateException("Segments overlap: " + 
aReady + " and " + seg);
+            }
+        }
+
+        // for all other segments, sourceOffset SHOULD fit/connect other 
segments
+        for (ISegment seg : news) {
+            Pair<Boolean, Boolean> pair = all.fitInSegments(seg);
+            boolean startFit = pair.getFirst();
+            boolean endFit = pair.getSecond();
+
+            if (!startFit)
+                logger.warn("NEW segment start does not fit/connect with other 
segments: " + seg);
+            if (!endFit)
+                logger.warn("NEW segment end does not fit/connect with other 
segments: " + seg);
+        }
+    }
+
+    public Pair<Boolean, Boolean> fitInSegments(ISegment newOne) {
+        if (this.isEmpty())
+            return null;
+
+        ISegment first = this.get(0);
+        ISegment last = this.get(this.size() - 1);
+        Endpoint start = newOne.getSegRange().start;
+        Endpoint end = newOne.getSegRange().end;
+        boolean startFit = false;
+        boolean endFit = false;
+        for (ISegment sss : this) {
+            if (sss == newOne)
+                continue;
+            startFit = startFit || (start.equals(sss.getSegRange().start) || 
start.equals(sss.getSegRange().end));
+            endFit = endFit || (end.equals(sss.getSegRange().start) || 
end.equals(sss.getSegRange().end));
+        }
+        if (!startFit && endFit && newOne == first)
+            startFit = true;
+        if (!endFit && startFit && newOne == last)
+            endFit = true;
+
+        return Pair.newPair(startFit, endFit);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
index f4aec12..5c2ebac 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import com.google.common.collect.Maps;
 
+@SuppressWarnings("serial")
 public class TableRef implements Serializable {
 
     final transient private DataModelDesc model;

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index 5563856..e7250a3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -27,7 +27,7 @@ import org.apache.kylin.metadata.datatype.DataType;
 
 /**
  */
-@SuppressWarnings({ "serial", "deprecation" })
+@SuppressWarnings({ "serial" })
 public class TblColRef implements Serializable {
 
     private static final String INNER_TABLE_NAME = "_kylin_table";

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java 
b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
index 43e46c6..b6d91ab 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
@@ -21,6 +21,9 @@ package org.apache.kylin.source;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+
 import com.google.common.base.Objects;
 
 /**
@@ -34,55 +37,35 @@ import com.google.common.base.Objects;
  * its own start and end offset within that partition.
  */
 public class SourcePartition {
-    long startDate;
-    long endDate;
-    long startOffset;
-    long endOffset;
+    TSRange tsRange;
+    SegmentRange segRange;
     Map<Integer, Long> sourcePartitionOffsetStart;
     Map<Integer, Long> sourcePartitionOffsetEnd;
 
     public SourcePartition() {
     }
 
-    public SourcePartition(long startDate, long endDate, long startOffset, 
long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, 
Long> sourcePartitionOffsetEnd) {
-        this.startDate = startDate;
-        this.endDate = endDate;
-        this.startOffset = startOffset;
-        this.endOffset = endOffset;
+    public SourcePartition(TSRange tsRange, SegmentRange segRange, 
Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> 
sourcePartitionOffsetEnd) {
+        this.tsRange = tsRange;
+        this.segRange = segRange;
         this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
         this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd;
     }
 
-    public long getStartDate() {
-        return startDate;
-    }
-
-    public void setStartDate(long startDate) {
-        this.startDate = startDate;
-    }
-
-    public long getEndDate() {
-        return endDate;
-    }
-
-    public void setEndDate(long endDate) {
-        this.endDate = endDate;
-    }
-
-    public long getStartOffset() {
-        return startOffset;
+    public TSRange getTSRange() {
+        return tsRange;
     }
 
-    public void setStartOffset(long startOffset) {
-        this.startOffset = startOffset;
+    public void setTSRange(TSRange tsRange) {
+        this.tsRange = tsRange;
     }
 
-    public long getEndOffset() {
-        return endOffset;
+    public SegmentRange getSegRange() {
+        return segRange;
     }
 
-    public void setEndOffset(long endOffset) {
-        this.endOffset = endOffset;
+    public void setSegRange(SegmentRange segRange) {
+        this.segRange = segRange;
     }
 
     public Map<Integer, Long> getSourcePartitionOffsetStart() {
@@ -103,15 +86,13 @@ public class SourcePartition {
 
     @Override
     public String toString() {
-        return Objects.toStringHelper(this).add("startDate", 
startDate).add("endDate", endDate).add("startOffset", 
startOffset).add("endOffset", endOffset).add("sourcePartitionOffsetStart", 
sourcePartitionOffsetStart.toString()).add("sourcePartitionOffsetEnd", 
sourcePartitionOffsetEnd.toString()).toString();
+        return Objects.toStringHelper(this).add("tsRange", 
tsRange).add("segRange", segRange).add("sourcePartitionOffsetStart", 
sourcePartitionOffsetStart.toString()).add("sourcePartitionOffsetEnd", 
sourcePartitionOffsetEnd.toString()).toString();
     }
 
     public static SourcePartition getCopyOf(SourcePartition origin) {
         SourcePartition copy = new SourcePartition();
-        copy.setStartDate(origin.getStartDate());
-        copy.setEndDate(origin.getEndDate());
-        copy.setStartOffset(origin.getStartOffset());
-        copy.setEndOffset(origin.getEndOffset());
+        copy.setTSRange(origin.getTSRange());
+        copy.setSegRange(origin.getSegRange());
         if (origin.getSourcePartitionOffsetStart() != null) {
             copy.setSourcePartitionOffsetStart(new 
HashMap<>(origin.getSourcePartitionOffsetStart()));
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java
index 907e0e5..19d6f2f 100644
--- 
a/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java
+++ 
b/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java
@@ -20,6 +20,7 @@ package org.apache.kylin.metadata.model;
 
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -51,10 +52,9 @@ public class DefaultPartitionConditionBuilderTest extends 
LocalFileMetadataTestC
         partitionDesc.setPartitionDateColumnRef(col);
         partitionDesc.setPartitionDateColumn(col.getCanonicalName());
         partitionDesc.setPartitionDateFormat("yyyy-MM-dd");
-        String condition = 
partitionConditionBuilder.buildDateRangeCondition(partitionDesc,
-                DateFormat.stringToMillis("2016-02-22"), 
DateFormat.stringToMillis("2016-02-23"));
-        Assert.assertEquals("UNKNOWN_ALIAS.DATE_COLUMN >= '2016-02-22' AND 
UNKNOWN_ALIAS.DATE_COLUMN < '2016-02-23'",
-                condition);
+        TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22"), 
DateFormat.stringToMillis("2016-02-23"));
+        String condition = 
partitionConditionBuilder.buildDateRangeCondition(partitionDesc, range);
+        Assert.assertEquals("UNKNOWN_ALIAS.DATE_COLUMN >= '2016-02-22' AND 
UNKNOWN_ALIAS.DATE_COLUMN < '2016-02-23'", condition);
     }
 
     @Test
@@ -64,8 +64,8 @@ public class DefaultPartitionConditionBuilderTest extends 
LocalFileMetadataTestC
         partitionDesc.setPartitionTimeColumnRef(col);
         partitionDesc.setPartitionTimeColumn(col.getCanonicalName());
         partitionDesc.setPartitionTimeFormat("HH");
-        String condition = 
partitionConditionBuilder.buildDateRangeCondition(partitionDesc,
-                DateFormat.stringToMillis("2016-02-22 00:00:00"), 
DateFormat.stringToMillis("2016-02-23 01:00:00"));
+        TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22 
00:00:00"), DateFormat.stringToMillis("2016-02-23 01:00:00"));
+        String condition = 
partitionConditionBuilder.buildDateRangeCondition(partitionDesc, range);
         Assert.assertEquals("UNKNOWN_ALIAS.HOUR_COLUMN >= '00' AND 
UNKNOWN_ALIAS.HOUR_COLUMN < '01'", condition);
     }
 
@@ -80,11 +80,9 @@ public class DefaultPartitionConditionBuilderTest extends 
LocalFileMetadataTestC
         partitionDesc.setPartitionTimeColumnRef(col2);
         partitionDesc.setPartitionTimeColumn(col2.getCanonicalName());
         partitionDesc.setPartitionTimeFormat("H");
-        String condition = 
partitionConditionBuilder.buildDateRangeCondition(partitionDesc,
-                DateFormat.stringToMillis("2016-02-22 00:00:00"), 
DateFormat.stringToMillis("2016-02-23 01:00:00"));
-        Assert.assertEquals(
-                "((UNKNOWN_ALIAS.DATE_COLUMN = '2016-02-22' AND 
UNKNOWN_ALIAS.HOUR_COLUMN >= '0') OR (UNKNOWN_ALIAS.DATE_COLUMN > 
'2016-02-22')) AND ((UNKNOWN_ALIAS.DATE_COLUMN = '2016-02-23' AND 
UNKNOWN_ALIAS.HOUR_COLUMN < '1') OR (UNKNOWN_ALIAS.DATE_COLUMN < 
'2016-02-23'))",
-                condition);
+        TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22 
00:00:00"), DateFormat.stringToMillis("2016-02-23 01:00:00"));
+        String condition = 
partitionConditionBuilder.buildDateRangeCondition(partitionDesc, range);
+        Assert.assertEquals("((UNKNOWN_ALIAS.DATE_COLUMN = '2016-02-22' AND 
UNKNOWN_ALIAS.HOUR_COLUMN >= '0') OR (UNKNOWN_ALIAS.DATE_COLUMN > 
'2016-02-22')) AND ((UNKNOWN_ALIAS.DATE_COLUMN = '2016-02-23' AND 
UNKNOWN_ALIAS.HOUR_COLUMN < '1') OR (UNKNOWN_ALIAS.DATE_COLUMN < 
'2016-02-23'))", condition);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
index bfddb1f..5db3611 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
@@ -268,6 +268,6 @@ public class HBaseKeyRange implements 
Comparable<HBaseKeyRange> {
     }
 
     public boolean hitSegment() {
-        return cubeSeg.getDateRangeStart() <= getPartitionColumnEndDate() && 
cubeSeg.getDateRangeEnd() >= getPartitionColumnStartDate();
+        return cubeSeg.getTSRange().start.v <= getPartitionColumnEndDate() && 
cubeSeg.getTSRange().end.v >= getPartitionColumnStartDate();
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index 2efd718..dd221f1 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -36,6 +36,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,7 +68,7 @@ public class UpdateCubeInfoAfterBuildStep extends 
AbstractExecutable {
         segment.setInputRecordsSize(sourceSizeBytes);
 
         try {
-            if (segment.isSourceOffsetsOn()) {
+            if (segment.isOffsetCube()) {
                 updateTimeRange(segment);
             }
 
@@ -108,8 +109,8 @@ public class UpdateCubeInfoAfterBuildStep extends 
AbstractExecutable {
             IOUtils.closeQuietly(isr);
             IOUtils.closeQuietly(bufferedReader);
         }
+        
         logger.info("updateTimeRange step. minValue:" + minValue + " 
maxValue:" + maxValue);
-        segment.setDateRangeStart(minValue);
-        segment.setDateRangeEnd(maxValue);
+        segment.setTSRange(new TSRange(minValue, maxValue));
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
index 04af4fe..a1815e2 100644
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
@@ -41,6 +41,7 @@ import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.source.IReadableTable.TableSignature;
@@ -158,7 +159,7 @@ public class MergeCuboidMapperTest extends 
LocalFileMetadataTestCase {
 
         //        String cubeName = 
"test_kylin_cube_without_slr_left_join_ready_2_segments";
 
-        CubeSegment newSeg = cubeManager.mergeSegments(cube, 0L, 
Long.MAX_VALUE, 0, 0, false);
+        CubeSegment newSeg = cubeManager.mergeSegments(cube, new TSRange(0L, 
Long.MAX_VALUE), null, false);
         //        String segmentName = newSeg.getName();
 
         final Dictionary<String> dictionary = 
cubeManager.getDictionary(newSeg, lfn);

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 4ff303d..42fc124 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -55,10 +55,8 @@ import 
org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.rest.job.StorageCleanupJob;
-import org.apache.kylin.source.ISource;
-import org.apache.kylin.source.SourceFactory;
-import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
 import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
@@ -243,7 +241,6 @@ public class BuildCubeWithEngine {
             this.countDownLatch = countDownLatch;
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public Boolean call() throws Exception {
             try {
@@ -259,12 +256,10 @@ public class BuildCubeWithEngine {
         }
     }
 
-    @SuppressWarnings("unused")
     protected boolean testTableExt() throws Exception {
         return true;
     }
 
-    @SuppressWarnings("unused")
     protected boolean testModel() throws Exception {
         return true;
     }
@@ -322,6 +317,7 @@ public class BuildCubeWithEngine {
         return false;
     }
 
+    @SuppressWarnings("unused")
     private void updateCubeEngineType(String cubeName) throws IOException {
         CubeDesc cubeDesc = cubeDescManager.getCubeDesc(cubeName);
         if (cubeDesc.getEngineType() != engineType) {
@@ -339,7 +335,7 @@ public class BuildCubeWithEngine {
     }
 
     private Boolean mergeSegment(String cubeName, long startDate, long 
endDate) throws Exception {
-        CubeSegment segment = 
cubeManager.mergeSegments(cubeManager.getCube(cubeName), startDate, endDate, 0, 
0, true);
+        CubeSegment segment = 
cubeManager.mergeSegments(cubeManager.getCube(cubeName), new TSRange(startDate, 
endDate), null, true);
         DefaultChainedExecutable job = 
EngineFactory.createBatchMergeJob(segment, "TEST");
         jobService.addJob(job);
         ExecutableState state = waitForJob(job.getId());
@@ -348,9 +344,7 @@ public class BuildCubeWithEngine {
 
     private Boolean buildSegment(String cubeName, long startDate, long 
endDate) throws Exception {
         CubeInstance cubeInstance = cubeManager.getCube(cubeName);
-        ISource source = SourceFactory.getSource(cubeInstance);
-        SourcePartition partition = 
source.enrichSourcePartitionBeforeBuild(cubeInstance, new SourcePartition(0, 
endDate, 0, 0, null, null));
-        CubeSegment segment = cubeManager.appendSegment(cubeInstance, 
partition.getStartDate(), partition.getEndDate());
+        CubeSegment segment = cubeManager.appendSegment(cubeInstance, new 
TSRange(0L, endDate));
         DefaultChainedExecutable job = 
EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);
         ExecutableState state = waitForJob(job.getId());

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 9c80413..239b4af 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -57,9 +57,11 @@ import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.job.streaming.Kafka10DataLoader;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.metadata.streaming.StreamingManager;
+import org.apache.kylin.rest.job.StorageCleanupJob;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.SourceFactory;
 import org.apache.kylin.source.SourcePartition;
@@ -68,7 +70,6 @@ import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.apache.kylin.storage.hbase.util.ZookeeperUtil;
-import org.apache.kylin.rest.job.StorageCleanupJob;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -245,9 +246,9 @@ public class BuildCubeWithStream {
         Assert.assertTrue(segments.size() == succeedBuild);
 
         if (fastBuildMode == false) {
-            long endOffset = segments.get(segments.size() - 
1).getSourceOffsetEnd();
+            long endOffset = (Long) segments.get(segments.size() - 
1).getSegRange().end.v;
             //merge
-            ExecutableState result = mergeSegment(cubeName, 0, endOffset);
+            ExecutableState result = mergeSegment(cubeName, new 
SegmentRange(0L, endOffset));
             Assert.assertTrue(result == ExecutableState.SUCCEED);
 
             segments = cubeManager.getCube(cubeName).getSegments();
@@ -255,7 +256,7 @@ public class BuildCubeWithStream {
 
             CubeSegment toRefreshSeg = segments.get(0);
 
-            refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), 
toRefreshSeg.getSourceOffsetEnd());
+            refreshSegment(cubeName, toRefreshSeg.getSegRange());
             segments = cubeManager.getCube(cubeName).getSegments();
             Assert.assertTrue(segments.size() == 1);
         }
@@ -263,16 +264,16 @@ public class BuildCubeWithStream {
         logger.info("Build is done");
     }
 
-    private ExecutableState mergeSegment(String cubeName, long startOffset, 
long endOffset) throws Exception {
-        CubeSegment segment = 
cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset, false);
+    private ExecutableState mergeSegment(String cubeName, SegmentRange 
segRange) throws Exception {
+        CubeSegment segment = 
cubeManager.mergeSegments(cubeManager.getCube(cubeName), null, segRange, false);
         DefaultChainedExecutable job = 
EngineFactory.createBatchMergeJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());
         return job.getStatus();
     }
 
-    private String refreshSegment(String cubeName, long startOffset, long 
endOffset) throws Exception {
-        CubeSegment segment = 
cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset);
+    private String refreshSegment(String cubeName, SegmentRange segRange) 
throws Exception {
+        CubeSegment segment = 
cubeManager.refreshSegment(cubeManager.getCube(cubeName), null, segRange);
         DefaultChainedExecutable job = 
EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());
@@ -282,7 +283,7 @@ public class BuildCubeWithStream {
     protected ExecutableState buildSegment(String cubeName, long startOffset, 
long endOffset) throws Exception {
         CubeInstance cubeInstance = cubeManager.getCube(cubeName);
         ISource source = SourceFactory.getSource(cubeInstance);
-        SourcePartition partition = 
source.enrichSourcePartitionBeforeBuild(cubeInstance, new SourcePartition(0, 0, 
startOffset, endOffset, null, null));
+        SourcePartition partition = 
source.enrichSourcePartitionBeforeBuild(cubeInstance, new SourcePartition(null, 
new SegmentRange(startOffset, endOffset), null, null));
         CubeSegment segment = 
cubeManager.appendSegment(cubeManager.getCube(cubeName), partition);
         DefaultChainedExecutable job = 
EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/query/.settings/org.eclipse.core.resources.prefs
----------------------------------------------------------------------
diff --git a/query/.settings/org.eclipse.core.resources.prefs 
b/query/.settings/org.eclipse.core.resources.prefs
index 29abf99..839d647 100644
--- a/query/.settings/org.eclipse.core.resources.prefs
+++ b/query/.settings/org.eclipse.core.resources.prefs
@@ -2,5 +2,4 @@ eclipse.preferences.version=1
 encoding//src/main/java=UTF-8
 encoding//src/main/resources=UTF-8
 encoding//src/test/java=UTF-8
-encoding//src/test/resources=UTF-8
 encoding/<project>=UTF-8

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 4244cf3..14014fc 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -41,6 +41,8 @@ import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISourceAware;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.rest.exception.BadRequestException;
@@ -270,8 +272,8 @@ public class CubeController extends BasicController {
     @RequestMapping(value = "/{cubeName}/rebuild", method = { 
RequestMethod.PUT }, produces = { "application/json" })
     @ResponseBody
     public JobInstance rebuild(@PathVariable String cubeName, @RequestBody 
JobBuildRequest req) {
-        return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 
0, 0, null, null, req.getBuildType(),
-                req.isForce() || req.isForceMergeEmptySegment());
+        return buildInternal(cubeName, new TSRange(req.getStartTime(), 
req.getEndTime()), null, null, null, 
+                req.getBuildType(), req.isForce() || 
req.isForceMergeEmptySegment());
     }
 
     /** Build/Rebuild a cube segment by source offset */
@@ -297,14 +299,14 @@ public class CubeController extends BasicController {
     @RequestMapping(value = "/{cubeName}/rebuild2", method = { 
RequestMethod.PUT }, produces = { "application/json" })
     @ResponseBody
     public JobInstance rebuild2(@PathVariable String cubeName, @RequestBody 
JobBuildRequest2 req) {
-        return buildInternal(cubeName, 0, 0, req.getSourceOffsetStart(), 
req.getSourceOffsetEnd(),
+        return buildInternal(cubeName, null, new 
SegmentRange(req.getSourceOffsetStart(), req.getSourceOffsetEnd()),
                 req.getSourcePartitionOffsetStart(), 
req.getSourcePartitionOffsetEnd(), req.getBuildType(),
                 req.isForce());
     }
 
-    private JobInstance buildInternal(String cubeName, long startTime, long 
endTime, //
-            long startOffset, long endOffset, Map<Integer, Long> 
sourcePartitionOffsetStart,
-            Map<Integer, Long> sourcePartitionOffsetEnd, String buildType, 
boolean force) {
+    private JobInstance buildInternal(String cubeName, TSRange tsRange, 
SegmentRange segRange, //
+            Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> 
sourcePartitionOffsetEnd,
+            String buildType, boolean force) {
         try {
             String submitter = 
SecurityContextHolder.getContext().getAuthentication().getName();
             CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
@@ -312,9 +314,8 @@ public class CubeController extends BasicController {
             if (cube == null) {
                 throw new InternalErrorException("Cannot find cube " + 
cubeName);
             }
-            return jobService.submitJob(cube, startTime, endTime, startOffset, 
endOffset, //
-                    sourcePartitionOffsetStart, sourcePartitionOffsetEnd, 
CubeBuildTypeEnum.valueOf(buildType), force,
-                    submitter);
+            return jobService.submitJob(cube, tsRange, segRange, 
sourcePartitionOffsetStart, sourcePartitionOffsetEnd,
+                    CubeBuildTypeEnum.valueOf(buildType), force, submitter);
         } catch (Throwable e) {
             logger.error(e.getLocalizedMessage(), e);
             throw new InternalErrorException(e.getLocalizedMessage(), e);
@@ -581,14 +582,14 @@ public class CubeController extends BasicController {
             }
 
             hr.setTableName(tableName);
-            hr.setDateRangeStart(segment.getDateRangeStart());
-            hr.setDateRangeEnd(segment.getDateRangeEnd());
+            hr.setDateRangeStart(segment.getTSRange().start.v);
+            hr.setDateRangeEnd(segment.getTSRange().end.v);
             hr.setSegmentName(segment.getName());
             hr.setSegmentStatus(segment.getStatus().toString());
             hr.setSourceCount(segment.getInputRecords());
-            if (segment.isSourceOffsetsOn()) {
-                hr.setSourceOffsetStart(segment.getSourceOffsetStart());
-                hr.setSourceOffsetEnd(segment.getSourceOffsetEnd());
+            if (segment.isOffsetCube()) {
+                hr.setSourceOffsetStart((Long) segment.getSegRange().start.v);
+                hr.setSourceOffsetEnd((Long) segment.getSegRange().end.v);
             }
             hbase.add(hr);
         }
@@ -627,14 +628,13 @@ public class CubeController extends BasicController {
             return jobs;
         }
 
-        boolean isOffsetOn = holes.get(0).isSourceOffsetsOn();
         for (CubeSegment hole : holes) {
-            if (isOffsetOn == true) {
+            if (hole.isOffsetCube()) {
                 JobBuildRequest2 request = new JobBuildRequest2();
                 request.setBuildType(CubeBuildTypeEnum.BUILD.toString());
-                request.setSourceOffsetStart(hole.getSourceOffsetStart());
+                request.setSourceOffsetStart((Long) 
hole.getSegRange().start.v);
+                request.setSourceOffsetEnd((Long) hole.getSegRange().end.v);
                 
request.setSourcePartitionOffsetStart(hole.getSourcePartitionOffsetStart());
-                request.setSourceOffsetEnd(hole.getSourceOffsetEnd());
                 
request.setSourcePartitionOffsetEnd(hole.getSourcePartitionOffsetEnd());
                 try {
                     JobInstance job = build2(cubeName, request);
@@ -647,8 +647,8 @@ public class CubeController extends BasicController {
             } else {
                 JobBuildRequest request = new JobBuildRequest();
                 request.setBuildType(CubeBuildTypeEnum.BUILD.toString());
-                request.setStartTime(hole.getDateRangeStart());
-                request.setEndTime(hole.getDateRangeEnd());
+                request.setStartTime(hole.getTSRange().start.v);
+                request.setEndTime(hole.getTSRange().end.v);
 
                 try {
                     JobInstance job = build(cubeName, request);

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
index f8d5f83..292b633 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
@@ -37,6 +37,8 @@ import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.metadata.draft.Draft;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISourceAware;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.rest.controller.BasicController;
 import org.apache.kylin.rest.exception.BadRequestException;
@@ -355,8 +357,8 @@ public class CubeControllerV2 extends BasicController {
             throws IOException {
 
         return new EnvelopeResponse(ResponseCode.CODE_SUCCESS,
-                buildInternalV2(cubeName, req.getStartTime(), 
req.getEndTime(), 0, 0, null, null, req.getBuildType(),
-                        req.isForce() || req.isForceMergeEmptySegment()),
+                buildInternalV2(cubeName, new TSRange(req.getStartTime(), 
req.getEndTime()), null, null, null,
+                        req.getBuildType(), req.isForce() || 
req.isForceMergeEmptySegment()),
                 "");
     }
 
@@ -392,15 +394,15 @@ public class CubeControllerV2 extends BasicController {
             throws IOException {
 
         return new EnvelopeResponse(ResponseCode.CODE_SUCCESS,
-                buildInternalV2(cubeName, 0, 0, req.getSourceOffsetStart(), 
req.getSourceOffsetEnd(),
+                buildInternalV2(cubeName, null, new 
SegmentRange(req.getSourceOffsetStart(), req.getSourceOffsetEnd()),
                         req.getSourcePartitionOffsetStart(), 
req.getSourcePartitionOffsetEnd(), req.getBuildType(),
                         req.isForce()),
                 "");
     }
 
-    private JobInstance buildInternalV2(String cubeName, long startTime, long 
endTime, //
-            long startOffset, long endOffset, Map<Integer, Long> 
sourcePartitionOffsetStart,
-            Map<Integer, Long> sourcePartitionOffsetEnd, String buildType, 
boolean force) throws IOException {
+    private JobInstance buildInternalV2(String cubeName, TSRange tsRange, 
SegmentRange segRange, //
+            Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> 
sourcePartitionOffsetEnd,
+            String buildType, boolean force) throws IOException {
         Message msg = MsgPicker.getMsg();
 
         String submitter = 
SecurityContextHolder.getContext().getAuthentication().getName();
@@ -412,9 +414,8 @@ public class CubeControllerV2 extends BasicController {
         if (cube.getDescriptor().isDraft()) {
             throw new BadRequestException(msg.getBUILD_DRAFT_CUBE());
         }
-        return jobService.submitJob(cube, startTime, endTime, startOffset, 
endOffset, //
-                sourcePartitionOffsetStart, sourcePartitionOffsetEnd, 
CubeBuildTypeEnum.valueOf(buildType), force,
-                submitter);
+        return jobService.submitJob(cube, tsRange, segRange, 
sourcePartitionOffsetStart, sourcePartitionOffsetEnd,
+                CubeBuildTypeEnum.valueOf(buildType), force, submitter);
     }
 
     @RequestMapping(value = "/{cubeName}/purge", method = { RequestMethod.PUT 
}, produces = {
@@ -470,15 +471,15 @@ public class CubeControllerV2 extends BasicController {
             }
 
             hr.setTableName(tableName);
-            hr.setDateRangeStart(segment.getDateRangeStart());
-            hr.setDateRangeEnd(segment.getDateRangeEnd());
+            hr.setDateRangeStart(segment.getTSRange().start.v);
+            hr.setDateRangeEnd(segment.getTSRange().end.v);
             hr.setSegmentName(segment.getName());
             hr.setSegmentUUID(segment.getUuid());
             hr.setSegmentStatus(segment.getStatus().toString());
             hr.setSourceCount(segment.getInputRecords());
-            if (segment.isSourceOffsetsOn()) {
-                hr.setSourceOffsetStart(segment.getSourceOffsetStart());
-                hr.setSourceOffsetEnd(segment.getSourceOffsetEnd());
+            if (segment.isOffsetCube()) {
+                hr.setSourceOffsetStart((Long) segment.getSegRange().start.v);
+                hr.setSourceOffsetEnd((Long) segment.getSegRange().end.v);
             }
             hbase.add(hr);
         }
@@ -524,14 +525,13 @@ public class CubeControllerV2 extends BasicController {
             return new EnvelopeResponse(ResponseCode.CODE_SUCCESS, jobs, "");
         }
 
-        boolean isOffsetOn = holes.get(0).isSourceOffsetsOn();
         for (CubeSegment hole : holes) {
-            if (isOffsetOn == true) {
+            if (hole.isOffsetCube()) {
                 JobBuildRequest2 request = new JobBuildRequest2();
                 request.setBuildType(CubeBuildTypeEnum.BUILD.toString());
-                request.setSourceOffsetStart(hole.getSourceOffsetStart());
+                request.setSourceOffsetStart((Long) 
hole.getSegRange().start.v);
+                request.setSourceOffsetEnd((Long) hole.getSegRange().end.v);
                 
request.setSourcePartitionOffsetStart(hole.getSourcePartitionOffsetStart());
-                request.setSourceOffsetEnd(hole.getSourceOffsetEnd());
                 
request.setSourcePartitionOffsetEnd(hole.getSourcePartitionOffsetEnd());
                 try {
                     JobInstance job = (JobInstance) build2V2(cubeName, 
request).data;
@@ -544,8 +544,8 @@ public class CubeControllerV2 extends BasicController {
             } else {
                 JobBuildRequest request = new JobBuildRequest();
                 request.setBuildType(CubeBuildTypeEnum.BUILD.toString());
-                request.setStartTime(hole.getDateRangeStart());
-                request.setEndTime(hole.getDateRangeEnd());
+                request.setStartTime(hole.getTSRange().start.v);
+                request.setEndTime(hole.getTSRange().end.v);
 
                 try {
                     JobInstance job = (JobInstance) buildV2(cubeName, 
request).data;

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server-base/src/main/java/org/apache/kylin/rest/job/HybridCubeCLI.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/job/HybridCubeCLI.java 
b/server-base/src/main/java/org/apache/kylin/rest/job/HybridCubeCLI.java
index 64fcbc8..875e978 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/HybridCubeCLI.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/HybridCubeCLI.java
@@ -18,6 +18,10 @@
 
 package org.apache.kylin.rest.job;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -39,10 +43,6 @@ import org.apache.kylin.storage.hybrid.HybridManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * 1. Create new HybridCube
  * bin/kylin.sh org.apache.kylin.tool.HybridCubeCLI -action create -name 
hybrid_name -project project_name -model model_name -cubes cube1,cube2
@@ -194,12 +194,12 @@ public class HybridCubeCLI extends AbstractApplication {
             if (segment == null)
                 continue;
             if (lastOffset == -1) {
-                lastOffset = segment.getSourceOffsetEnd();
+                lastOffset = (Long) segment.getSegRange().end.v;
             } else {
-                if (lastOffset > segment.getSourceOffsetStart()) {
-                    throw new RuntimeException("Segments has overlap, could 
not hybrid. Last Segment End: " + lastOffset + ", Next Segment Start: " + 
segment.getSourceOffsetStart());
+                if (lastOffset > (Long) segment.getSegRange().start.v) {
+                    throw new RuntimeException("Segments has overlap, could 
not hybrid. Last Segment End: " + lastOffset + ", Next Segment Start: " + 
segment.getSegRange().start.v);
                 }
-                lastOffset = segment.getSourceOffsetEnd();
+                lastOffset = (Long) segment.getSegRange().end.v;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index fe8fb6c..e79bab9 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -28,7 +28,6 @@ import java.util.List;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -43,6 +42,7 @@ import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.draft.Draft;
 import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
@@ -537,11 +537,11 @@ public class CubeService extends BasicService implements 
InitializingBean {
                 return;
 
             List<CubeSegment> toRemoveSegs = Lists.newArrayList();
-            long tail = readySegs.get(readySegs.size() - 1).getDateRangeEnd();
+            long tail = readySegs.get(readySegs.size() - 1).getTSRange().end.v;
             long head = tail - desc.getRetentionRange();
             for (CubeSegment seg : readySegs) {
-                if (seg.getDateRangeEnd() > 0) { // for streaming cube its 
initial value is 0
-                    if (seg.getDateRangeEnd() <= head) {
+                if (seg.getTSRange().end.v > 0) { // for streaming cube its 
initial value is 0
+                    if (seg.getTSRange().end.v <= head) {
                         toRemoveSegs.add(seg);
                     }
                 }
@@ -567,10 +567,9 @@ public class CubeService extends BasicService implements 
InitializingBean {
         synchronized (CubeService.class) {
             try {
                 cube = getCubeManager().getCube(cubeName);
-                Pair<Long, Long> offsets = 
getCubeManager().autoMergeCubeSegments(cube);
+                SegmentRange offsets = cube.autoMergeCubeSegments();
                 if (offsets != null) {
-                    CubeSegment newSeg = getCubeManager().mergeSegments(cube, 
0, 0, offsets.getFirst(),
-                            offsets.getSecond(), true);
+                    CubeSegment newSeg = getCubeManager().mergeSegments(cube, 
null, offsets, true);
                     logger.debug("Will submit merge job on " + newSeg);
                     DefaultChainedExecutable job = 
EngineFactory.createBatchMergeJob(newSeg, "SYSTEM");
                     getExecutableManager().addJob(job);

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 8257d4c..d2180a7 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -18,12 +18,18 @@
 
 package org.apache.kylin.rest.service;
 
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+
+import javax.annotation.Nullable;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.directory.api.util.Strings;
 import org.apache.kylin.common.KylinConfig;
@@ -48,6 +54,8 @@ import 
org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.rest.exception.BadRequestException;
@@ -65,16 +73,12 @@ import 
org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.EnableAspectJAutoProxy;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * @author ysong1
@@ -200,11 +204,11 @@ public class JobService extends BasicService implements 
InitializingBean {
         }
     }
 
-    public JobInstance submitJob(CubeInstance cube, long startDate, long 
endDate, long startOffset, long endOffset, //
+    public JobInstance submitJob(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange, //
             Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> 
sourcePartitionOffsetEnd,
             CubeBuildTypeEnum buildType, boolean force, String submitter) 
throws IOException {
         aclEvaluate.checkProjectOperationPermission(cube);
-        JobInstance jobInstance = submitJobInternal(cube, startDate, endDate, 
startOffset, endOffset, sourcePartitionOffsetStart,
+        JobInstance jobInstance = submitJobInternal(cube, tsRange, segRange, 
sourcePartitionOffsetStart,
                 sourcePartitionOffsetEnd, buildType, force, submitter);
 
         accessService.init(jobInstance, null);
@@ -213,9 +217,8 @@ public class JobService extends BasicService implements 
InitializingBean {
         return jobInstance;
     }
 
-    public JobInstance submitJobInternal(CubeInstance cube, long startDate, 
long endDate, long startOffset,
-                                         long endOffset, //
-                                         Map<Integer, Long> 
sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd,
+    public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange, //
+                                         Map<Integer, Long> 
sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, //
                                          CubeBuildTypeEnum buildType, boolean 
force, String submitter) throws IOException {
         Message msg = MsgPicker.getMsg();
 
@@ -230,16 +233,15 @@ public class JobService extends BasicService implements 
InitializingBean {
         try {
             if (buildType == CubeBuildTypeEnum.BUILD) {
                 ISource source = SourceFactory.getSource(cube);
-                SourcePartition sourcePartition = new 
SourcePartition(startDate, endDate, startOffset, endOffset,
-                        sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
-                sourcePartition = 
source.enrichSourcePartitionBeforeBuild(cube, sourcePartition);
-                newSeg = getCubeManager().appendSegment(cube, sourcePartition);
+                SourcePartition src = new SourcePartition(tsRange, segRange, 
sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
+                src = source.enrichSourcePartitionBeforeBuild(cube, src);
+                newSeg = getCubeManager().appendSegment(cube, src);
                 job = EngineFactory.createBatchCubingJob(newSeg, submitter);
             } else if (buildType == CubeBuildTypeEnum.MERGE) {
-                newSeg = getCubeManager().mergeSegments(cube, startDate, 
endDate, startOffset, endOffset, force);
+                newSeg = getCubeManager().mergeSegments(cube, tsRange, 
segRange, force);
                 job = EngineFactory.createBatchMergeJob(newSeg, submitter);
             } else if (buildType == CubeBuildTypeEnum.REFRESH) {
-                newSeg = getCubeManager().refreshSegment(cube, startDate, 
endDate, startOffset, endOffset);
+                newSeg = getCubeManager().refreshSegment(cube, tsRange, 
segRange);
                 job = EngineFactory.createBatchCubingJob(newSeg, submitter);
             } else {
                 throw new 
BadRequestException(String.format(msg.getINVALID_BUILD_TYPE(), buildType));
@@ -335,7 +337,7 @@ public class JobService extends BasicService implements 
InitializingBean {
         final String segmentIds = job.getRelatedSegment();
         for (String segmentId : StringUtils.split(segmentIds)) {
             final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
-            if (segment != null && (segment.getStatus() == 
SegmentStatusEnum.NEW || segment.getDateRangeEnd() == 0)) {
+            if (segment != null && (segment.getStatus() == 
SegmentStatusEnum.NEW || segment.getTSRange().end.v == 0)) {
                 // Remove this segments
                 CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
                 cubeBuilder.setToRemoveSegs(segment);

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java
index 42cf085..61c8bff 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -42,6 +42,7 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.JoinsTree;
 import org.apache.kylin.metadata.model.ModelDimensionDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectInstance;
@@ -257,13 +258,8 @@ public class ModelService extends BasicService {
                 }
 
                 @Override
-                public long getSourceOffsetStart() {
-                    return 0;
-                }
-
-                @Override
-                public long getSourceOffsetEnd() {
-                    return 0;
+                public SegmentRange getSegRange() {
+                    return null;
                 }
 
                 @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java
----------------------------------------------------------------------
diff --git 
a/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java 
b/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java
index e67c238..2c91d90 100644
--- 
a/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java
+++ 
b/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.CubeRequest;
 import org.apache.kylin.rest.service.CubeService;
@@ -35,10 +36,10 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
-import org.springframework.beans.factory.annotation.Qualifier;
 
 /**
  * @author xduo
@@ -179,10 +180,10 @@ public class CubeControllerTest extends ServiceTestBase {
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
         List<CubeSegment> segments = cube.getSegments();
 
-        final long dateEnd = segments.get(segments.size() 
-1).getDateRangeEnd();
+        final long dateEnd = segments.get(segments.size() 
-1).getTSRange().end.v;
 
         final long ONEDAY = 24 * 60 * 60000;
-        cubeService.getCubeManager().appendSegment(cube, dateEnd + ONEDAY, 
dateEnd + ONEDAY * 2);
+        cubeService.getCubeManager().appendSegment(cube, new TSRange(dateEnd + 
ONEDAY, dateEnd + ONEDAY * 2));
 
         List<CubeSegment> holes = cubeController.getHoles(cubeName);
 
@@ -190,8 +191,7 @@ public class CubeControllerTest extends ServiceTestBase {
 
         CubeSegment hole = holes.get(0);
 
-        Assert.assertTrue(hole.getDateRangeStart() == dateEnd && 
hole.getDateRangeEnd() == (dateEnd + ONEDAY));
-
+        Assert.assertTrue(hole.getTSRange().equals(new TSRange(dateEnd, 
dateEnd + ONEDAY)));
     }
 
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git 
a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java 
b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index d059883..704e45d 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -40,6 +40,7 @@ import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.JoinTableDesc;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
@@ -238,7 +239,7 @@ public class CacheServiceTest extends 
LocalFileMetadataTestCase {
         CubeInstance cube = cubeManager.getCube(cubeName);
         assertEquals(0, cube.getSegments().size());
         assertEquals(0, cubeManagerB.getCube(cubeName).getSegments().size());
-        CubeSegment segment = cubeManager.appendSegment(cube, 0, 1000);
+        CubeSegment segment = cubeManager.appendSegment(cube, new TSRange(0L, 
1000L));
         //one for cube update
         assertEquals(1, broadcaster.getCounterAndClear());
         waitForCounterAndClear(1);

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index 6e45406..2fa2fdc 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -64,8 +64,7 @@ public class HiveSource implements ISource {
     @Override
     public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable 
buildable, SourcePartition srcPartition) {
         SourcePartition result = SourcePartition.getCopyOf(srcPartition);
-        result.setStartOffset(0);
-        result.setEndOffset(0);
+        result.setSegRange(null);
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
index cd66837..5e06f90 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
@@ -53,8 +53,7 @@ public class JdbcSource implements ISource {
     @Override
     public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable 
buildable, SourcePartition srcPartition) {
         SourcePartition result = SourcePartition.getCopyOf(srcPartition);
-        result.setStartOffset(0);
-        result.setEndOffset(0);
+        result.setSegRange(null);
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 161099e..fe07b6f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -29,6 +29,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.streaming.StreamingConfig;
@@ -68,8 +69,9 @@ public class KafkaSource implements ISource {
     public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable 
buildable, SourcePartition srcPartition) {
         checkSourceOffsets(srcPartition);
         final SourcePartition result = SourcePartition.getCopyOf(srcPartition);
+        final SegmentRange range = result.getSegRange();
         final CubeInstance cube = (CubeInstance) buildable;
-        if (result.getStartOffset() == 0) {
+        if (range == null || range.start.v.equals(0L)) {
             final CubeSegment last = cube.getLastSegment();
             if (last != null) {
                 logger.debug("Last segment exists, continue from last segment 
" + last.getName() + "'s end position: " + last.getSourcePartitionOffsetEnd());
@@ -101,7 +103,7 @@ public class KafkaSource implements ISource {
             }
         }
 
-        if (result.getEndOffset() == Long.MAX_VALUE) {
+        if (range == null || range.end.v.equals(Long.MAX_VALUE)) {
             logger.debug("Seek end offsets from topic");
             Map<Integer, Long> latestOffsets = 
KafkaClient.getLatestOffsets(cube);
             logger.debug("The end offsets are " + latestOffsets);
@@ -134,18 +136,20 @@ public class KafkaSource implements ISource {
             throw new IllegalArgumentException("No new message comes, 
startOffset = endOffset:" + totalStartOffset);
         }
 
-        result.setStartOffset(totalStartOffset);
-        result.setEndOffset(totalEndOffset);
+        result.setSegRange(new SegmentRange(totalStartOffset, totalEndOffset));
 
         logger.debug("parsePartitionBeforeBuild() return: " + result);
         return result;
     }
 
-    private void checkSourceOffsets(SourcePartition srcPartition) {
-        long startOffset = srcPartition.getStartOffset();
-        long endOffset = srcPartition.getEndOffset();
-        final Map<Integer, Long> sourcePartitionOffsetStart = 
srcPartition.getSourcePartitionOffsetStart();
-        final Map<Integer, Long> sourcePartitionOffsetEnd = 
srcPartition.getSourcePartitionOffsetEnd();
+    private void checkSourceOffsets(SourcePartition src) {
+        if (src.getSegRange() == null)
+            return;
+        
+        long startOffset = (Long) src.getSegRange().start.v;
+        long endOffset = (Long) src.getSegRange().end.v;
+        final Map<Integer, Long> sourcePartitionOffsetStart = 
src.getSourcePartitionOffsetStart();
+        final Map<Integer, Long> sourcePartitionOffsetEnd = 
src.getSourcePartitionOffsetEnd();
         if (endOffset <= 0 || startOffset >= endOffset) {
             throw new IllegalArgumentException("'startOffset' need be smaller 
than 'endOffset'");
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
index 914fca2..e3a7586 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@ -19,9 +19,7 @@ package org.apache.kylin.source.kafka.job;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
 
-import com.google.common.base.Preconditions;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -31,9 +29,14 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.Segments;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  */
 public class MergeOffsetStep extends AbstractExecutable {
@@ -52,7 +55,7 @@ public class MergeOffsetStep extends AbstractExecutable {
         final CubeSegment segment = cube.getSegmentById(segmentId);
 
         Preconditions.checkNotNull(segment, "Cube segment '" + segmentId + "' 
not found.");
-        List<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
+        Segments<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
 
         Preconditions.checkArgument(mergingSegs.size() > 0, "Merging segment 
not exist.");
 
@@ -60,16 +63,11 @@ public class MergeOffsetStep extends AbstractExecutable {
         final CubeSegment first = mergingSegs.get(0);
         final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
 
-        segment.setSourceOffsetStart(first.getSourceOffsetStart());
+        segment.setSegRange(new SegmentRange(first.getSegRange().start, 
last.getSegRange().end));
         
segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
-        segment.setSourceOffsetEnd(last.getSourceOffsetEnd());
         
segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
 
-        long dateRangeStart = CubeManager.minDateRangeStart(mergingSegs);
-        long dateRangeEnd = CubeManager.maxDateRangeEnd(mergingSegs);
-
-        segment.setDateRangeStart(dateRangeStart);
-        segment.setDateRangeEnd(dateRangeEnd);
+        segment.setTSRange(new TSRange(mergingSegs.getTSStart(), 
mergingSegs.getTSEnd()));
 
         CubeUpdate cubeBuilder = new CubeUpdate(cube);
         cubeBuilder.setToUpdateSegs(segment);

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
index 1cdb2f8..ff8c487 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
@@ -156,17 +156,17 @@ public class ExtendCubeToHybridCLI {
         CubeSegment currentSeg = null;
         while (segmentIterator.hasNext()) {
             currentSeg = segmentIterator.next();
-            if (partitionDateStr != null && (currentSeg.getDateRangeStart() >= 
partitionDate || currentSeg.getDateRangeEnd() > partitionDate)) {
+            if (partitionDateStr != null && (currentSeg.getTSRange().start.v 
>= partitionDate || currentSeg.getTSRange().end.v > partitionDate)) {
                 segmentIterator.remove();
                 logger.info("CubeSegment[" + currentSeg + "] was removed.");
             }
         }
-        if (partitionDateStr != null && partitionDate != 
currentSeg.getDateRangeEnd()) {
+        if (partitionDateStr != null && partitionDate != 
currentSeg.getTSRange().end.v) {
             logger.error("PartitionDate must be end date of one segment.");
             return;
         }
         if (currentSeg != null && partitionDateStr == null)
-            partitionDate = currentSeg.getDateRangeEnd();
+            partitionDate = currentSeg.getTSRange().end.v;
 
         cubeManager.createCube(newCubeInstance, projectName, owner);
         logger.info("CubeInstance was saved at: " + 
newCubeInstance.getResourcePath());

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
----------------------------------------------------------------------
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java 
b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
index f52fc3e..d048dd7 100644
--- a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
@@ -152,17 +152,17 @@ public class ExtendCubeToHybridCLI {
         CubeSegment currentSeg = null;
         while (segmentIterator.hasNext()) {
             currentSeg = segmentIterator.next();
-            if (partitionDateStr != null && (currentSeg.getDateRangeStart() >= 
partitionDate || currentSeg.getDateRangeEnd() > partitionDate)) {
+            if (partitionDateStr != null && (currentSeg.getTSRange().start.v 
>= partitionDate || currentSeg.getTSRange().end.v > partitionDate)) {
                 segmentIterator.remove();
                 logger.info("CubeSegment[" + currentSeg + "] was removed.");
             }
         }
-        if (currentSeg != null && partitionDateStr != null && partitionDate != 
currentSeg.getDateRangeEnd()) {
+        if (currentSeg != null && partitionDateStr != null && partitionDate != 
currentSeg.getTSRange().end.v) {
             logger.error("PartitionDate must be end date of one segment.");
             return;
         }
         if (currentSeg != null && partitionDateStr == null)
-            partitionDate = currentSeg.getDateRangeEnd();
+            partitionDate = currentSeg.getTSRange().end.v;
 
         cubeManager.createCube(newCubeInstance, projectName, owner);
         logger.info("CubeInstance was saved at: " + 
newCubeInstance.getResourcePath());

Reply via email to