http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --cc core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 17e3aa5,e5ec2d7..1fbbbe6
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@@ -43,7 -45,7 +43,6 @@@ import org.apache.kylin.common.util.Dic
  import org.apache.kylin.common.util.Pair;
  import org.apache.kylin.cube.cuboid.Cuboid;
  import org.apache.kylin.cube.model.CubeDesc;
--import org.apache.kylin.cube.model.DictionaryDesc;
  import org.apache.kylin.dict.DictionaryInfo;
  import org.apache.kylin.dict.DictionaryManager;
  import org.apache.kylin.dict.lookup.LookupStringTable;
@@@ -378,632 -445,555 +378,694 @@@ public class CubeManager implements IRe
          return cube;
      }
  
 -    // append a full build segment
 -    public CubeSegment appendSegment(CubeInstance cube) throws IOException {
 -        return appendSegment(cube, null, null, null, null);
 -    }
 -
 -    public CubeSegment appendSegment(CubeInstance cube, TSRange tsRange) 
throws IOException {
 -        return appendSegment(cube, tsRange, null, null, null);
 +    // for test
 +    CubeInstance reloadCube(String cubeName) {
 +        try (AutoLock lock = cubeMapLock.lockForWrite()) {
 +            return crud.reload(cubeName);
 +        }
      }
  
 -    public CubeSegment appendSegment(CubeInstance cube, SourcePartition src) 
throws IOException {
 -        return appendSegment(cube, src.getTSRange(), src.getSegRange(), 
src.getSourcePartitionOffsetStart(),
 -                src.getSourcePartitionOffsetEnd());
 +    // for internal
 +    CubeInstance reloadCubeQuietly(String cubeName) {
 +        try (AutoLock lock = cubeMapLock.lockForWrite()) {
 +            CubeInstance cube = crud.reloadQuietly(cubeName);
 +            if (cube != null)
 +                Cuboid.clearCache(cube);
 +            return cube;
 +        }
      }
  
 -    CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange,
 -            Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> 
sourcePartitionOffsetEnd)
 -            throws IOException {
 -        checkInputRanges(tsRange, segRange);
 -        checkBuildingSegment(cube);
 -
 -        // fix start/end a bit
 -        if (cube.getModel().getPartitionDesc().isPartitioned()) {
 -            // if missing start, set it to where last time ends
 -            CubeSegment last = cube.getLastSegment();
 -            CubeDesc cubeDesc = cube.getDescriptor();
 -            if (tsRange != null && tsRange.start.v == 0) {
 -                if (last == null) {
 -                    tsRange = new TSRange(cubeDesc.getPartitionDateStart(), 
tsRange.end.v);
 -                } else if (!last.isOffsetCube()) {
 -                    tsRange = new TSRange(last.getTSRange().end.v, 
tsRange.end.v);
 +    public void removeCubeLocal(String cubeName) {
 +        try (AutoLock lock = cubeMapLock.lockForWrite()) {
 +            CubeInstance cube = cubeMap.get(cubeName);
 +            if (cube != null) {
 +                cubeMap.removeLocal(cubeName);
 +                for (CubeSegment segment : cube.getSegments()) {
 +                    usedStorageLocation.remove(segment.getUuid());
                  }
 +                Cuboid.clearCache(cube);
              }
 -        } else {
 -            // full build
 -            tsRange = null;
 -            segRange = null;
          }
 +    }
  
 -        CubeSegment newSegment = newSegment(cube, tsRange, segRange);
 -        newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart);
 -        newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd);
 -        validateNewSegments(cube, newSegment);
 +    public CubeInstance dropCube(String cubeName, boolean deleteDesc) throws 
IOException {
 +        try (AutoLock lock = cubeMapLock.lockForWrite()) {
 +            logger.info("Dropping cube '" + cubeName + "'");
 +            // load projects before remove cube from project
  
 -        CubeUpdate cubeBuilder = new CubeUpdate(cube);
 -        cubeBuilder.setToAddSegs(newSegment);
 -        updateCube(cubeBuilder);
 -        return newSegment;
 -    }
 +            // delete cube instance and cube desc
 +            CubeInstance cube = getCube(cubeName);
  
 -    public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange) throws IOException {
 -        checkInputRanges(tsRange, segRange);
 -        checkBuildingSegment(cube);
 -
 -        CubeSegment newSegment = newSegment(cube, tsRange, segRange);
 -
 -        Pair<Boolean, Boolean> pair = 
cube.getSegments().fitInSegments(newSegment);
 -        if (pair.getFirst() == false || pair.getSecond() == false)
 -            throw new IllegalArgumentException("The new refreshing segment " 
+ newSegment
 -                    + " does not match any existing segment in cube " + cube);
 -
 -        if (segRange != null) {
 -            CubeSegment toRefreshSeg = null;
 -            for (CubeSegment cubeSegment : cube.getSegments()) {
 -                if (cubeSegment.getSegRange().equals(segRange)) {
 -                    toRefreshSeg = cubeSegment;
 -                    break;
 -                }
 -            }
 +            // remove cube and update cache
 +            crud.delete(cube);
 +            Cuboid.clearCache(cube);
  
 -            if (toRefreshSeg == null) {
 -                throw new IllegalArgumentException("For streaming cube, only 
one segment can be refreshed at one time");
 +            if (deleteDesc && cube.getDescriptor() != null) {
 +                
CubeDescManager.getInstance(config).removeCubeDesc(cube.getDescriptor());
              }
  
 -            
newSegment.setSourcePartitionOffsetStart(toRefreshSeg.getSourcePartitionOffsetStart());
 -            
newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd());
 +            // delete cube from project
 +            
ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE,
 cubeName);
 +
 +            return cube;
          }
 +    }
  
 -        CubeUpdate cubeBuilder = new CubeUpdate(cube);
 -        cubeBuilder.setToAddSegs(newSegment);
 -        updateCube(cubeBuilder);
 +    @VisibleForTesting
 +    /*private*/ String generateStorageLocation() {
 +        String namePrefix = config.getHBaseTableNamePrefix();
++        String namespace = config.getHBaseStorageNameSpace();
 +        String tableName = "";
 +        Random ran = new Random();
 +        do {
 +            StringBuffer sb = new StringBuffer();
++            if ((namespace.equals("default") || namespace.equals("")) == 
false) {
++                sb.append(namespace).append(":");
++            }
 +            sb.append(namePrefix);
 +            for (int i = 0; i < HBASE_TABLE_LENGTH; i++) {
 +                sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length())));
 +            }
 +            tableName = sb.toString();
 +        } while (this.usedStorageLocation.containsValue(tableName));
 +        return tableName;
 +    }
  
 -        return newSegment;
 +    public CubeInstance copyForWrite(CubeInstance cube) {
 +        return crud.copyForWrite(cube);
      }
  
 -    public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> 
cuboidsRecommend) throws IOException {
 -        checkReadyForOptimize(cube);
 +    private boolean isReady(CubeSegment seg) {
 +        return seg.getStatus() == SegmentStatusEnum.READY;
 +    }
  
 -        List<CubeSegment> readySegments = 
cube.getSegments(SegmentStatusEnum.READY);
 -        CubeSegment[] optimizeSegments = new 
CubeSegment[readySegments.size()];
 -        int i = 0;
 -        for (CubeSegment segment : readySegments) {
 -            CubeSegment newSegment = newSegment(cube, segment.getTSRange(), 
null);
 -            validateNewSegments(cube, newSegment);
 +    private TableMetadataManager getTableManager() {
 +        return TableMetadataManager.getInstance(config);
 +    }
  
 -            optimizeSegments[i++] = newSegment;
 -        }
 +    private DictionaryManager getDictionaryManager() {
 +        return DictionaryManager.getInstance(config);
 +    }
  
 -        CubeUpdate cubeBuilder = new CubeUpdate(cube);
 -        cubeBuilder.setCuboidsRecommend(cuboidsRecommend);
 -        cubeBuilder.setToAddSegs(optimizeSegments);
 -        updateCube(cubeBuilder);
 +    private SnapshotManager getSnapshotManager() {
 +        return SnapshotManager.getInstance(config);
 +    }
  
 -        return optimizeSegments;
 +    private ResourceStore getStore() {
 +        return ResourceStore.getStore(this.config);
      }
  
 -    public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange, boolean force)
 -            throws IOException {
 -        if (cube.getSegments().isEmpty())
 -            throw new IllegalArgumentException("Cube " + cube + " has no 
segments");
 -
 -        checkInputRanges(tsRange, segRange);
 -        checkBuildingSegment(cube);
 -        checkCubeIsPartitioned(cube);
 -
 -        if (cube.getSegments().getFirstSegment().isOffsetCube()) {
 -            // offset cube, merge by date range?
 -            if (segRange == null && tsRange != null) {
 -                Pair<CubeSegment, CubeSegment> pair = 
cube.getSegments(SegmentStatusEnum.READY)
 -                        .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE);
 -                if (pair == null)
 -                    throw new IllegalArgumentException("Find no segments to 
merge by " + tsRange + " for cube " + cube);
 -                segRange = new 
SegmentRange(pair.getFirst().getSegRange().start, 
pair.getSecond().getSegRange().end);
 -            }
 -            tsRange = null;
 -            Preconditions.checkArgument(segRange != null);
 -        } else {
 -            segRange = null;
 -            Preconditions.checkArgument(tsRange != null);
 -        }
 -
 -        CubeSegment newSegment = newSegment(cube, tsRange, segRange);
 -
 -        Segments<CubeSegment> mergingSegments = 
cube.getMergingSegments(newSegment);
 -        if (mergingSegments.size() <= 1)
 -            throw new IllegalArgumentException("Range " + 
newSegment.getSegRange()
 -                    + " must contain at least 2 segments, but there is " + 
mergingSegments.size());
 -
 -        CubeSegment first = mergingSegments.get(0);
 -        CubeSegment last = mergingSegments.get(mergingSegments.size() - 1);
 -        if (first.isOffsetCube()) {
 -            newSegment.setSegRange(new 
SegmentRange(first.getSegRange().start, last.getSegRange().end));
 -            
newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
 -            
newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
 -            newSegment.setTSRange(null);
 -        } else {
 -            newSegment.setTSRange(new TSRange(mergingSegments.getTSStart(), 
mergingSegments.getTSEnd()));
 -            newSegment.setSegRange(null);
 -        }
 -
 -        if (force == false) {
 -            List<String> emptySegment = Lists.newArrayList();
 -            for (CubeSegment seg : mergingSegments) {
 -                if (seg.getSizeKB() == 0) {
 -                    emptySegment.add(seg.getName());
 -                }
 -            }
 +    @Override
 +    public RealizationType getRealizationType() {
 +        return RealizationType.CUBE;
 +    }
  
 -            if (emptySegment.size() > 0) {
 -                throw new IllegalArgumentException(
 -                        "Empty cube segment found, couldn't merge unless 
'forceMergeEmptySegment' set to true: "
 -                                + emptySegment);
 -            }
 -        }
 +    @Override
 +    public IRealization getRealization(String name) {
 +        return getCube(name);
 +    }
  
 -        validateNewSegments(cube, newSegment);
 +    // 
============================================================================
 +    // Segment related methods
 +    // 
============================================================================
  
 -        CubeUpdate cubeBuilder = new CubeUpdate(cube);
 -        cubeBuilder.setToAddSegs(newSegment);
 -        updateCube(cubeBuilder);
 +    // append a full build segment
 +    public CubeSegment appendSegment(CubeInstance cube) throws IOException {
 +        return appendSegment(cube, null, null, null, null);
 +    }
  
 -        return newSegment;
 +    public CubeSegment appendSegment(CubeInstance cube, TSRange tsRange) 
throws IOException {
 +        return appendSegment(cube, tsRange, null, null, null);
      }
 -    
 -    private void checkInputRanges(TSRange tsRange, SegmentRange segRange) {
 -        if (tsRange != null && segRange != null) {
 -            throw new IllegalArgumentException("Build or refresh cube segment 
either by TSRange or by SegmentRange, not both.");
 +
 +    public CubeSegment appendSegment(CubeInstance cube, SourcePartition src) 
throws IOException {
 +        return appendSegment(cube, src.getTSRange(), src.getSegRange(), 
src.getSourcePartitionOffsetStart(),
 +                src.getSourcePartitionOffsetEnd());
 +    }
 +
 +    CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange,
 +            Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> 
sourcePartitionOffsetEnd)
 +            throws IOException {
 +        try (AutoLock lock = cubeMapLock.lockForWrite()) {
 +            return segAssist.appendSegment(cube, tsRange, segRange, 
sourcePartitionOffsetStart,
 +                    sourcePartitionOffsetEnd);
          }
      }
  
 -    private void checkBuildingSegment(CubeInstance cube) {
 -        checkBuildingSegment(cube, cube.getConfig().getMaxBuildingSegments());
 +    public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange) throws IOException {
 +        try (AutoLock lock = cubeMapLock.lockForWrite()) {
 +            return segAssist.refreshSegment(cube, tsRange, segRange);
 +        }
      }
  
 -    public void checkReadyForOptimize(CubeInstance cube) {
 -        checkBuildingSegment(cube, 1);
++    public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> 
cuboidsRecommend) throws IOException {
++        try (AutoLock lock = cubeMapLock.lockForWrite()) {
++            return segAssist.optimizeSegments(cube, cuboidsRecommend);
++        }
++    }
++    
 +    public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange, boolean force)
 +            throws IOException {
 +        try (AutoLock lock = cubeMapLock.lockForWrite()) {
 +            return segAssist.mergeSegments(cube, tsRange, segRange, force);
 +        }
      }
  
 -    private void checkBuildingSegment(CubeInstance cube, int maxBuildingSeg) {
 -        if (cube.getBuildingSegments().size() >= maxBuildingSeg) {
 -            throw new IllegalStateException(
 -                    "There is already " + cube.getBuildingSegments().size() + 
" building segment; ");
 +    public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment 
newSegment) throws IOException {
 +        try (AutoLock lock = cubeMapLock.lockForWrite()) {
 +            segAssist.promoteNewlyBuiltSegments(cube, newSegment);
          }
      }
  
-     public void validateNewSegments(CubeInstance cube, CubeSegment 
newSegments) {
-         segAssist.validateNewSegments(cube, newSegments);
 -    private void checkCubeIsPartitioned(CubeInstance cube) {
 -        if 
(cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == false) {
 -            throw new IllegalStateException(
 -                    "there is no partition date column specified, only full 
build is supported");
++    public void promoteNewlyOptimizeSegments(CubeInstance cube, 
CubeSegment... optimizedSegments) throws IOException {
++        try (AutoLock lock = cubeMapLock.lockForWrite()) {
++            segAssist.promoteNewlyOptimizeSegments(cube, optimizedSegments);
+         }
      }
  
 -    /**
 -     * After cube update, reload cube related cache
 -     *
 -     * @param cubeName
 -     */
 -    public CubeInstance reloadCubeLocal(String cubeName) {
 -        CubeInstance cubeInstance = 
reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName));
 -        Cuboid.clearCache(cubeInstance);
 -        return cubeInstance;
++    public void promoteCheckpointOptimizeSegments(CubeInstance cube, 
Map<Long, Long> recommendCuboids,
++            CubeSegment... optimizedSegments) throws IOException {
++        try (AutoLock lock = cubeMapLock.lockForWrite()) {
++            segAssist.promoteCheckpointOptimizeSegments(cube, 
recommendCuboids, optimizedSegments);
++        }        
++    }
++    
 +    public List<CubeSegment> calculateHoles(String cubeName) {
 +        return segAssist.calculateHoles(cubeName);
      }
  
 -    public void removeCubeLocal(String cubeName) {
 -        CubeInstance cube = cubeMap.get(cubeName);
 -        if (cube != null) {
 -            cubeMap.removeLocal(cubeName);
 -            for (CubeSegment segment : cube.getSegments()) {
 -                usedStorageLocation.remove(segment.getUuid());
 +    private class SegmentAssist {
 +
 +        CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange,
 +                Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, 
Long> sourcePartitionOffsetEnd)
 +                throws IOException {
 +            CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a 
latest copy
 +
 +            checkInputRanges(tsRange, segRange);
 +            checkBuildingSegment(cubeCopy);
 +
 +            // fix start/end a bit
 +            if (cubeCopy.getModel().getPartitionDesc().isPartitioned()) {
 +                // if missing start, set it to where last time ends
-                 CubeSegment last = cubeCopy.getLastSegment();
-                 if (last != null && !last.isOffsetCube() && tsRange.start.v 
== 0) {
-                     tsRange = new TSRange(last.getTSRange().end.v, 
tsRange.end.v);
++                if (tsRange != null && tsRange.start.v == 0) {
++                    CubeDesc cubeDesc = cubeCopy.getDescriptor();
++                    CubeSegment last = cubeCopy.getLastSegment();
++                    if (last == null)
++                        tsRange = new 
TSRange(cubeDesc.getPartitionDateStart(), tsRange.end.v);
++                    else if (!last.isOffsetCube())
++                        tsRange = new TSRange(last.getTSRange().end.v, 
tsRange.end.v);
 +                }
 +            } else {
 +                // full build
 +                tsRange = null;
 +                segRange = null;
              }
 -            Cuboid.clearCache(cube);
 +
 +            CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange);
 +            
newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart);
 +            newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd);
 +            validateNewSegments(cubeCopy, newSegment);
 +
 +            CubeUpdate update = new CubeUpdate(cubeCopy);
 +            update.setToAddSegs(newSegment);
 +            updateCube(update);
 +            return newSegment;
          }
 -    }
  
 -    public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc 
join) {
 +        public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange)
 +                throws IOException {
 +            CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a 
latest copy
  
 -        String tableName = join.getPKSide().getTableIdentity();
 -        String[] pkCols = join.getPrimaryKey();
 -        String snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
 -        if (snapshotResPath == null)
 -            throw new IllegalStateException("No snapshot for table '" + 
tableName + "' found on cube segment"
 -                    + cubeSegment.getCubeInstance().getName() + "/" + 
cubeSegment);
 +            checkInputRanges(tsRange, segRange);
 +            checkBuildingSegment(cubeCopy);
  
 -        try {
 -            SnapshotTable snapshot = 
getSnapshotManager().getSnapshotTable(snapshotResPath);
 -            TableDesc tableDesc = getTableManager().getTableDesc(tableName, 
cubeSegment.getProject());
 -            return new LookupStringTable(tableDesc, pkCols, snapshot);
 -        } catch (IOException e) {
 -            throw new IllegalStateException(
 -                    "Failed to load lookup table " + tableName + " from 
snapshot " + snapshotResPath, e);
 +            if (cubeCopy.getModel().getPartitionDesc().isPartitioned() == 
false) {
 +                // full build
 +                tsRange = null;
 +                segRange = null;
 +            }
 +
 +            CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange);
 +
 +            Pair<Boolean, Boolean> pair = 
cubeCopy.getSegments().fitInSegments(newSegment);
 +            if (pair.getFirst() == false || pair.getSecond() == false)
 +                throw new IllegalArgumentException("The new refreshing 
segment " + newSegment
 +                        + " does not match any existing segment in cube " + 
cubeCopy);
 +
 +            if (segRange != null) {
 +                CubeSegment toRefreshSeg = null;
 +                for (CubeSegment cubeSegment : cubeCopy.getSegments()) {
 +                    if (cubeSegment.getSegRange().equals(segRange)) {
 +                        toRefreshSeg = cubeSegment;
 +                        break;
 +                    }
 +                }
 +
 +                if (toRefreshSeg == null) {
 +                    throw new IllegalArgumentException(
 +                            "For streaming cube, only one segment can be 
refreshed at one time");
 +                }
 +
 +                
newSegment.setSourcePartitionOffsetStart(toRefreshSeg.getSourcePartitionOffsetStart());
 +                
newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd());
 +            }
 +
 +            CubeUpdate update = new CubeUpdate(cubeCopy);
 +            update.setToAddSegs(newSegment);
 +            updateCube(update);
 +
 +            return newSegment;
          }
 -    }
  
 -    private CubeSegment newSegment(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange) {
 -        CubeSegment segment = new CubeSegment();
 -        segment.setUuid(UUID.randomUUID().toString());
 -        segment.setName(CubeSegment.makeSegmentName(tsRange, segRange));
 -        segment.setCreateTimeUTC(System.currentTimeMillis());
 -        segment.setCubeInstance(cube);
 -        
 -        // let full build range be backward compatible
 -        if (tsRange == null && segRange == null)
 -            tsRange = new TSRange(0L, Long.MAX_VALUE);
 -        
 -        segment.setTSRange(tsRange);
 -        segment.setSegRange(segRange);
 -        segment.setStatus(SegmentStatusEnum.NEW);
 -        segment.setStorageLocationIdentifier(generateStorageLocation());
++        public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> 
cuboidsRecommend) throws IOException {
++            checkReadyForOptimize(cube);
+ 
 -        segment.setCubeInstance(cube);
++            List<CubeSegment> readySegments = 
cube.getSegments(SegmentStatusEnum.READY);
++            CubeSegment[] optimizeSegments = new 
CubeSegment[readySegments.size()];
++            int i = 0;
++            for (CubeSegment segment : readySegments) {
++                CubeSegment newSegment = newSegment(cube, 
segment.getTSRange(), null);
++                validateNewSegments(cube, newSegment);
+ 
 -        segment.validate();
 -        return segment;
 -    }
++                optimizeSegments[i++] = newSegment;
++            }
+ 
 -    @VisibleForTesting
 -    /*private*/ String generateStorageLocation() {
 -        String namePrefix = config.getHBaseTableNamePrefix();
 -        String namespace = config.getHBaseStorageNameSpace();
 -        String tableName = "";
 -        Random ran = new Random();
 -        do {
 -            StringBuffer sb = new StringBuffer();
 -            if ((namespace.equals("default") || namespace.equals("")) == 
false) {
 -                sb.append(namespace).append(":");
++            CubeUpdate update = new CubeUpdate(cube);
++            update.setCuboidsRecommend(cuboidsRecommend);
++            update.setToAddSegs(optimizeSegments);
++            updateCube(update);
++
++            return optimizeSegments;
++        }
++
 +        public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange, boolean force)
 +                throws IOException {
 +            CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a 
latest copy
 +
 +            if (cubeCopy.getSegments().isEmpty())
 +                throw new IllegalArgumentException("Cube " + cubeCopy + " has 
no segments");
 +
 +            checkInputRanges(tsRange, segRange);
 +            checkBuildingSegment(cubeCopy);
 +            checkCubeIsPartitioned(cubeCopy);
 +
 +            if (cubeCopy.getSegments().getFirstSegment().isOffsetCube()) {
 +                // offset cube, merge by date range?
 +                if (segRange == null && tsRange != null) {
 +                    Pair<CubeSegment, CubeSegment> pair = 
cubeCopy.getSegments(SegmentStatusEnum.READY)
 +                            .findMergeOffsetsByDateRange(tsRange, 
Long.MAX_VALUE);
 +                    if (pair == null)
 +                        throw new IllegalArgumentException(
 +                                "Find no segments to merge by " + tsRange + " 
for cube " + cubeCopy);
 +                    segRange = new 
SegmentRange(pair.getFirst().getSegRange().start,
 +                            pair.getSecond().getSegRange().end);
 +                }
 +                tsRange = null;
 +                Preconditions.checkArgument(segRange != null);
 +            } else {
 +                segRange = null;
 +                Preconditions.checkArgument(tsRange != null);
              }
 -            sb.append(namePrefix);
 -            for (int i = 0; i < HBASE_TABLE_LENGTH; i++) {
 -                sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length())));
 +
 +            CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange);
 +
 +            Segments<CubeSegment> mergingSegments = 
cubeCopy.getMergingSegments(newSegment);
 +            if (mergingSegments.size() <= 1)
 +                throw new IllegalArgumentException("Range " + 
newSegment.getSegRange()
 +                        + " must contain at least 2 segments, but there is " 
+ mergingSegments.size());
 +
 +            CubeSegment first = mergingSegments.get(0);
 +            CubeSegment last = mergingSegments.get(mergingSegments.size() - 
1);
 +            if (force == false) {
 +                for (int i = 0; i < mergingSegments.size() - 1; i++) {
 +                    if 
(!mergingSegments.get(i).getSegRange().connects(mergingSegments.get(i + 
1).getSegRange()))
 +                        throw new IllegalStateException("Merging segments 
must not have gaps between "
 +                                + mergingSegments.get(i) + " and " + 
mergingSegments.get(i + 1));
 +                }
 +            }
 +            if (first.isOffsetCube()) {
 +                newSegment.setSegRange(new 
SegmentRange(first.getSegRange().start, last.getSegRange().end));
 +                
newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
 +                
newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
 +                newSegment.setTSRange(null);
 +            } else {
 +                newSegment.setTSRange(new 
TSRange(mergingSegments.getTSStart(), mergingSegments.getTSEnd()));
 +                newSegment.setSegRange(null);
              }
 -            tableName = sb.toString();
 -        } while (this.usedStorageLocation.containsValue(tableName));
 -        return tableName;
 -    }
  
 -    public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment 
newSegment) throws IOException {
 -        if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier()))
 -            throw new IllegalStateException(
 -                    "For cube " + cube + ", segment " + newSegment + " 
missing StorageLocationIdentifier");
 +            if (force == false) {
 +                List<String> emptySegment = Lists.newArrayList();
 +                for (CubeSegment seg : mergingSegments) {
 +                    if (seg.getSizeKB() == 0) {
 +                        emptySegment.add(seg.getName());
 +                    }
 +                }
  
 -        if (StringUtils.isBlank(newSegment.getLastBuildJobID()))
 -            throw new IllegalStateException("For cube " + cube + ", segment " 
+ newSegment + " missing LastBuildJobID");
 +                if (emptySegment.size() > 0) {
 +                    throw new IllegalArgumentException(
 +                            "Empty cube segment found, couldn't merge unless 
'forceMergeEmptySegment' set to true: "
 +                                    + emptySegment);
 +                }
 +            }
  
 -        if (isReady(newSegment) == true) {
 -            logger.warn("For cube " + cube + ", segment " + newSegment + " 
state should be NEW but is READY");
 -        }
 +            validateNewSegments(cubeCopy, newSegment);
  
 -        List<CubeSegment> tobe = cube.calculateToBeSegments(newSegment);
 +            CubeUpdate update = new CubeUpdate(cubeCopy);
 +            update.setToAddSegs(newSegment);
 +            updateCube(update);
  
 -        if (tobe.contains(newSegment) == false)
 -            throw new IllegalStateException(
 -                    "For cube " + cube + ", segment " + newSegment + " is 
expected but not in the tobe " + tobe);
 +            return newSegment;
 +        }
  
 -        newSegment.setStatus(SegmentStatusEnum.READY);
 +        private void checkInputRanges(TSRange tsRange, SegmentRange segRange) 
{
 +            if (tsRange != null && segRange != null) {
 +                throw new IllegalArgumentException(
 +                        "Build or refresh cube segment either by TSRange or 
by SegmentRange, not both.");
 +            }
 +        }
  
 -        List<CubeSegment> toRemoveSegs = Lists.newArrayList();
 -        for (CubeSegment segment : cube.getSegments()) {
 -            if (!tobe.contains(segment))
 -                toRemoveSegs.add(segment);
 +        private void checkBuildingSegment(CubeInstance cube) {
-             int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments();
-             if (cube.getBuildingSegments().size() >= maxBuldingSeg) {
++            checkBuildingSegment(cube, 
cube.getConfig().getMaxBuildingSegments());
+         }
+ 
 -        logger.info("Promoting cube " + cube + ", new segment " + newSegment 
+ ", to remove segments " + toRemoveSegs);
++        private void checkReadyForOptimize(CubeInstance cube) {
++            checkBuildingSegment(cube, 1);
++        }
+ 
 -        CubeUpdate cubeBuilder = new CubeUpdate(cube);
 -        cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new 
CubeSegment[toRemoveSegs.size()]))
 -                
.setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY);
 -        updateCube(cubeBuilder);
 -    }
++        private void checkBuildingSegment(CubeInstance cube, int 
maxBuildingSeg) {
++            if (cube.getBuildingSegments().size() >= maxBuildingSeg) {
 +                throw new IllegalStateException(
 +                        "There is already " + 
cube.getBuildingSegments().size() + " building segment; ");
 +            }
 +        }
  
 -    public void promoteNewlyOptimizeSegments(CubeInstance cube, 
CubeSegment... optimizedSegments) throws IOException {
 -        for (CubeSegment seg : optimizedSegments) {
 -            seg.setStatus(SegmentStatusEnum.READY_PENDING);
 +        private void checkCubeIsPartitioned(CubeInstance cube) {
 +            if 
(cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == false) {
 +                throw new IllegalStateException(
 +                        "there is no partition date column specified, only 
full build is supported");
 +            }
          }
  
 -        CubeUpdate cubeBuilder = new CubeUpdate(cube);
 -        cubeBuilder.setToUpdateSegs(optimizedSegments);
 -        updateCube(cubeBuilder);
 -    }
 +        private CubeSegment newSegment(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange) {
 +            DataModelDesc modelDesc = cube.getModel();
  
 -    public void promoteCheckpointOptimizeSegments(CubeInstance cube, 
Map<Long, Long> recommendCuboids,
 -            CubeSegment... optimizedSegments) throws IOException {
 -        if (cube.getSegments().size() != optimizedSegments.length * 2) {
 -            throw new IllegalStateException("For cube " + cube
 -                    + ", every READY segment should be optimized and all 
segments should be READY before optimizing");
 +            CubeSegment segment = new CubeSegment();
 +            segment.setUuid(UUID.randomUUID().toString());
 +            segment.setName(CubeSegment.makeSegmentName(tsRange, segRange, 
modelDesc));
 +            segment.setCreateTimeUTC(System.currentTimeMillis());
 +            segment.setCubeInstance(cube);
 +
 +            // let full build range be backward compatible
 +            if (tsRange == null && segRange == null)
 +                tsRange = new TSRange(0L, Long.MAX_VALUE);
 +
 +            segment.setTSRange(tsRange);
 +            segment.setSegRange(segRange);
 +            segment.setStatus(SegmentStatusEnum.NEW);
 +            segment.setStorageLocationIdentifier(generateStorageLocation());
 +
 +            segment.setCubeInstance(cube);
 +
 +            segment.validate();
 +            return segment;
          }
 -        CubeSegment[] originalSegments = new 
CubeSegment[optimizedSegments.length];
 -        int i = 0;
 -        for (CubeSegment seg : optimizedSegments) {
 -            originalSegments[i++] = cube.getOriginalSegmentToOptimize(seg);
  
 -            if (StringUtils.isBlank(seg.getStorageLocationIdentifier()))
 +        public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment 
newSegment) throws IOException {
 +            // work on copy instead of cached objects
 +            CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a 
latest copy
 +            CubeSegment newSegCopy = 
cubeCopy.getSegmentById(newSegment.getUuid());
 +
 +            if 
(StringUtils.isBlank(newSegCopy.getStorageLocationIdentifier()))
                  throw new IllegalStateException(
 -                        "For cube " + cube + ", segment " + seg + " missing 
StorageLocationIdentifier");
 +                        "For cube " + cubeCopy + ", segment " + newSegCopy + 
" missing StorageLocationIdentifier");
  
 -            if (StringUtils.isBlank(seg.getLastBuildJobID()))
 -                throw new IllegalStateException("For cube " + cube + ", 
segment " + seg + " missing LastBuildJobID");
 +            if (StringUtils.isBlank(newSegCopy.getLastBuildJobID()))
 +                throw new IllegalStateException(
 +                        "For cube " + cubeCopy + ", segment " + newSegCopy + 
" missing LastBuildJobID");
  
 -            seg.setStatus(SegmentStatusEnum.READY);
 -        }
 +            if (isReady(newSegCopy) == true) {
 +                logger.warn("For cube " + cubeCopy + ", segment " + 
newSegCopy + " state should be NEW but is READY");
 +            }
  
 -        logger.info("Promoting cube " + cube + ", new segments " + 
Arrays.toString(optimizedSegments)
 -                + ", to remove segments " + originalSegments);
 +            List<CubeSegment> tobe = 
cubeCopy.calculateToBeSegments(newSegCopy);
  
 -        CubeUpdate cubeBuilder = new CubeUpdate(cube);
 -        cubeBuilder.setToRemoveSegs(originalSegments) //
 -                .setToUpdateSegs(optimizedSegments) //
 -                .setStatus(RealizationStatusEnum.READY) //
 -                .setCuboids(recommendCuboids) //
 -                .setCuboidsRecommend(Sets.<Long> newHashSet());
 -        updateCube(cubeBuilder);
 -    }
 +            if (tobe.contains(newSegCopy) == false)
 +                throw new IllegalStateException("For cube " + cubeCopy + ", 
segment " + newSegCopy
 +                        + " is expected but not in the tobe " + tobe);
  
 -    public void validateNewSegments(CubeInstance cube, CubeSegment 
newSegments) {
 -        List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments);
 -        List<CubeSegment> newList = Arrays.asList(newSegments);
 -        if (tobe.containsAll(newList) == false) {
 -            throw new IllegalStateException("For cube " + cube + ", the new 
segments " + newList
 -                    + " do not fit in its current " + cube.getSegments() + "; 
the resulted tobe is " + tobe);
 -        }
 -    }
 +            newSegCopy.setStatus(SegmentStatusEnum.READY);
  
 -    private boolean isReady(CubeSegment seg) {
 -        return seg.getStatus() == SegmentStatusEnum.READY;
 -    }
 +            List<CubeSegment> toRemoveSegs = Lists.newArrayList();
 +            for (CubeSegment segment : cubeCopy.getSegments()) {
 +                if (!tobe.contains(segment))
 +                    toRemoveSegs.add(segment);
 +            }
  
 -    private void loadAllCubeInstance() throws IOException {
 -        ResourceStore store = getStore();
 -        List<String> paths = 
store.collectResourceRecursively(ResourceStore.CUBE_RESOURCE_ROOT, ".json");
 +            logger.info("Promoting cube " + cubeCopy + ", new segment " + 
newSegCopy + ", to remove segments "
 +                    + toRemoveSegs);
  
 -        logger.info("Loading Cube from folder " + 
store.getReadableResourcePath(ResourceStore.CUBE_RESOURCE_ROOT));
 +            CubeUpdate update = new CubeUpdate(cubeCopy);
 +            update.setToRemoveSegs(toRemoveSegs.toArray(new 
CubeSegment[toRemoveSegs.size()]))
 +                    
.setToUpdateSegs(newSegCopy).setStatus(RealizationStatusEnum.READY);
 +            updateCube(update);
 +        }
  
-         public void validateNewSegments(CubeInstance cube, CubeSegment 
newSegments) {
 -        int succeed = 0;
 -        int fail = 0;
 -        for (String path : paths) {
 -            CubeInstance cube = reloadCubeLocalAt(path);
 -            if (cube == null) {
 -                fail++;
 -            } else {
 -                succeed++;
++        public void promoteNewlyOptimizeSegments(CubeInstance cube, 
CubeSegment... optimizedSegments) throws IOException {
++            CubeInstance cubeCopy = cube.latestCopyForWrite();
++            CubeSegment[] segCopy = cube.regetSegments(optimizedSegments);
++            
++            for (CubeSegment seg : segCopy) {
++                seg.setStatus(SegmentStatusEnum.READY_PENDING);
+             }
++
++            CubeUpdate update = new CubeUpdate(cubeCopy);
++            update.setToUpdateSegs(segCopy);
++            updateCube(update);
+         }
+ 
 -        logger.info("Loaded " + succeed + " cubes, fail on " + fail + " 
cubes");
 -    }
++        public void promoteCheckpointOptimizeSegments(CubeInstance cube, 
Map<Long, Long> recommendCuboids,
++                CubeSegment... optimizedSegments) throws IOException {
++            CubeInstance cubeCopy = cube.latestCopyForWrite();
++            CubeSegment[] optSegCopy = 
cubeCopy.regetSegments(optimizedSegments);
++            
++            if (cubeCopy.getSegments().size() != optSegCopy.length * 2) {
++                throw new IllegalStateException("For cube " + cubeCopy
++                        + ", every READY segment should be optimized and all 
segments should be READY before optimizing");
++            }
++            
++            CubeSegment[] originalSegments = new 
CubeSegment[optSegCopy.length];
++            int i = 0;
++            for (CubeSegment seg : optSegCopy) {
++                originalSegments[i++] = 
cubeCopy.getOriginalSegmentToOptimize(seg);
+ 
 -    private CubeInstance reloadCubeLocalAt(String path) {
 -        ResourceStore store = getStore();
 -        CubeInstance cube;
++                if (StringUtils.isBlank(seg.getStorageLocationIdentifier()))
++                    throw new IllegalStateException(
++                            "For cube " + cubeCopy + ", segment " + seg + " 
missing StorageLocationIdentifier");
+ 
 -        try {
 -            cube = store.getResource(path, CubeInstance.class, 
CUBE_SERIALIZER);
 -            checkNotNull(cube, "cube (at %s) not found", path);
 -
 -            String cubeName = cube.getName();
 -            checkState(StringUtils.isNotBlank(cubeName), "cube (at %s) name 
must not be blank", path);
 -
 -            CubeDesc cubeDesc = 
CubeDescManager.getInstance(config).getCubeDesc(cube.getDescName());
 -            checkNotNull(cubeDesc, "cube descriptor '%s' (for cube '%s') not 
found", cube.getDescName(), cubeName);
 -            if (!isSpecialTestCube(cubeName))
 -                checkState(cubeDesc.getName().equals(cubeName),
 -                        "cube name '%s' must be same as descriptor name '%s', 
but it is not", cubeName,
 -                        cubeDesc.getName());
 -
 -            if (!cubeDesc.getError().isEmpty()) {
 -                cube.setStatus(RealizationStatusEnum.DESCBROKEN);
 -                logger.error("cube descriptor {} (for cube '{}') is broken", 
cubeDesc.getResourcePath(), cubeName);
 -                for (String error : cubeDesc.getError()) {
 -                    logger.error("Error: {}", error);
 -                }
 -            } else if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
 -                cube.setStatus(RealizationStatusEnum.DISABLED);
 -                logger.info("cube {} changed from DESCBROKEN to DISABLED", 
cubeName);
++                if (StringUtils.isBlank(seg.getLastBuildJobID()))
++                    throw new IllegalStateException("For cube " + cubeCopy + 
", segment " + seg + " missing LastBuildJobID");
++
++                seg.setStatus(SegmentStatusEnum.READY);
+             }
+ 
 -            cube.setConfig((KylinConfigExt) cubeDesc.getConfig());
 -            cubeMap.putLocal(cubeName, cube);
++            logger.info("Promoting cube " + cubeCopy + ", new segments " + 
Arrays.toString(optSegCopy)
++                    + ", to remove segments " + originalSegments);
++
++            CubeUpdate update = new CubeUpdate(cubeCopy);
++            update.setToRemoveSegs(originalSegments) //
++                    .setToUpdateSegs(optSegCopy) //
++                    .setStatus(RealizationStatusEnum.READY) //
++                    .setCuboids(recommendCuboids) //
++                    .setCuboidsRecommend(Sets.<Long> newHashSet());
++            updateCube(update);
++        }
+ 
 -            for (CubeSegment segment : cube.getSegments()) {
 -                usedStorageLocation.put(segment.getUuid(), 
segment.getStorageLocationIdentifier());
++        private void validateNewSegments(CubeInstance cube, CubeSegment 
newSegments) {
 +            List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments);
 +            List<CubeSegment> newList = Arrays.asList(newSegments);
 +            if (tobe.containsAll(newList) == false) {
 +                throw new IllegalStateException("For cube " + cube + ", the 
new segments " + newList
 +                        + " do not fit in its current " + cube.getSegments() 
+ "; the resulted tobe is " + tobe);
              }
 +        }
  
 -            logger.info("Reloaded cube {} being {} having {} segments", 
cubeName, cube, cube.getSegments().size());
 -            return cube;
 +        /**
 +         * Calculate the holes (gaps) in segments.
 +         * @param cubeName
 +         * @return
 +         */
 +        public List<CubeSegment> calculateHoles(String cubeName) {
 +            List<CubeSegment> holes = Lists.newArrayList();
 +            final CubeInstance cube = getCube(cubeName);
 +            DataModelDesc modelDesc = cube.getModel();
 +            Preconditions.checkNotNull(cube);
 +            final List<CubeSegment> segments = cube.getSegments();
 +            logger.info("totally " + segments.size() + " cubeSegments");
 +            if (segments.size() == 0) {
 +                return holes;
 +            }
  
 -        } catch (Exception e) {
 -            logger.error("Error during load cube instance, skipping : " + 
path, e);
 -            return null;
 +            Collections.sort(segments);
 +            for (int i = 0; i < segments.size() - 1; ++i) {
 +                CubeSegment first = segments.get(i);
 +                CubeSegment second = segments.get(i + 1);
 +                if (first.getSegRange().connects(second.getSegRange()))
 +                    continue;
 +
 +                if (first.getSegRange().apartBefore(second.getSegRange())) {
 +                    CubeSegment hole = new CubeSegment();
 +                    hole.setCubeInstance(cube);
 +                    if (first.isOffsetCube()) {
 +                        hole.setSegRange(new 
SegmentRange(first.getSegRange().end, second.getSegRange().start));
 +                        
hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd());
 +                        
hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart());
 +                        hole.setName(CubeSegment.makeSegmentName(null, 
hole.getSegRange(), modelDesc));
 +                    } else {
 +                        hole.setTSRange(new TSRange(first.getTSRange().end.v, 
second.getTSRange().start.v));
 +                        
hole.setName(CubeSegment.makeSegmentName(hole.getTSRange(), null, modelDesc));
 +                    }
 +                    holes.add(hole);
 +                }
 +            }
 +            return holes;
          }
 -    }
  
 -    private boolean isSpecialTestCube(String cubeName) {
 -        return cubeName.equals("kylin_sales_cube") //
 -                || config.isDevEnv()
 -                        && (cubeName.startsWith("test_kylin_cube") || 
cubeName.startsWith("test_streaming"));
      }
  
 -    private TableMetadataManager getTableManager() {
 -        return TableMetadataManager.getInstance(config);
 -    }
 -    
 -    private DictionaryManager getDictionaryManager() {
 -        return DictionaryManager.getInstance(config);
 +    // 
============================================================================
 +    // Dictionary/Snapshot related methods
 +    // 
============================================================================
 +
 +    public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, 
IReadableTable inpTable)
 +            throws IOException {
 +        return dictAssist.buildDictionary(cubeSeg, col, inpTable);
      }
  
 -    private SnapshotManager getSnapshotManager() {
 -        return SnapshotManager.getInstance(config);
 +    public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, 
IReadableTable inpTable,
 +            Dictionary<String> dict) throws IOException {
 +        return dictAssist.saveDictionary(cubeSeg, col, inpTable, dict);
      }
  
 -    private ResourceStore getStore() {
 -        return ResourceStore.getStore(this.config);
 +    /**
 +     * return null if no dictionary for given column
 +     */
 +    public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef 
col) {
 +        return dictAssist.getDictionary(cubeSeg, col);
      }
  
 -    @Override
 -    public RealizationType getRealizationType() {
 -        return RealizationType.CUBE;
 +    public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String 
lookupTable) throws IOException {
 +        return dictAssist.buildSnapshotTable(cubeSeg, lookupTable);
      }
  
 -    @Override
 -    public IRealization getRealization(String name) {
 -        return getCube(name);
 +    public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc 
join) {
 +        return dictAssist.getLookupTable(cubeSegment, join);
      }
  
-     //UHC (ultra high cardinality column): contain the ShardByColumns and the 
GlobalDictionaryColumns
-     public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
-         return dictAssist.getUHCIndex(cubeDesc);
-     }
- 
 -    // 
============================================================================
 +    private class DictionaryAssist {
 +        public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef 
col, IReadableTable inpTable)
 +                throws IOException {
 +            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
 +            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
 +                return null;
  
 -    public List<TblColRef> getAllGlobalDictColumns(CubeDesc cubeDesc) {
 -        List<TblColRef> globalDictCols = new ArrayList<TblColRef>();
 -        List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
 +            String builderClass = cubeDesc.getDictionaryBuilderClass(col);
 +            DictionaryInfo dictInfo = 
getDictionaryManager().buildDictionary(col, inpTable, builderClass);
  
 -        if (dictionaryDescList == null) {
 -            return globalDictCols;
 +            saveDictionaryInfo(cubeSeg, col, dictInfo);
 +            return dictInfo;
          }
  
 -        for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
 -            if (dictionaryDesc.getBuilderClass() != null) {
 -                globalDictCols.add(dictionaryDesc.getColumnRef());
 -            }
 +        public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef 
col, IReadableTable inpTable,
 +                Dictionary<String> dict) throws IOException {
 +            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
 +            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
 +                return null;
 +
 +            DictionaryInfo dictInfo = 
getDictionaryManager().saveDictionary(col, inpTable, dict);
 +
 +            saveDictionaryInfo(cubeSeg, col, dictInfo);
 +            return dictInfo;
          }
 -        return globalDictCols;
 -    }
  
 -    //UHC (ultra high cardinality column): contain the ShardByColumns and the 
GlobalDictionaryColumns
 -    public List<TblColRef> getAllUHCColumns(CubeDesc cubeDesc) {
 -        List<TblColRef> uhcColumns = new ArrayList<TblColRef>();
 -        uhcColumns.addAll(getAllGlobalDictColumns(cubeDesc));
 -        uhcColumns.addAll(cubeDesc.getShardByColumns());
 -        return uhcColumns;
 -    }
 +        private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, 
DictionaryInfo dictInfo)
 +                throws IOException {
 +            if (dictInfo == null)
 +                return;
  
 -    public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
 -        List<TblColRef> factDictCols = 
Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
 -        List<TblColRef> uhcColumns = getAllUHCColumns(cubeDesc);
 -        int[] uhcIndex = new int[factDictCols.size()];
 +            // work on copy instead of cached objects
 +            CubeInstance cubeCopy = 
cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
 +            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
  
 -        for (int i = 0; i < factDictCols.size(); i++) {
 -            if (uhcColumns.contains(factDictCols.get(i))) {
 -                uhcIndex[i] = 1;
 +            Dictionary<?> dict = dictInfo.getDictionaryObject();
 +            segCopy.putDictResPath(col, dictInfo.getResourcePath());
 +            segCopy.getRowkeyStats().add(new Object[] { col.getIdentity(), 
dict.getSize(), dict.getSizeOfId() });
 +
 +            CubeUpdate update = new CubeUpdate(cubeCopy);
 +            update.setToUpdateSegs(segCopy);
 +            updateCube(update);
 +        }
 +
 +        /**
 +         * return null if no dictionary for given column
 +         */
 +        @SuppressWarnings("unchecked")
 +        public Dictionary<String> getDictionary(CubeSegment cubeSeg, 
TblColRef col) {
 +            DictionaryInfo info = null;
 +            try {
 +                DictionaryManager dictMgr = getDictionaryManager();
 +                String dictResPath = cubeSeg.getDictResPath(col);
 +                if (dictResPath == null)
 +                    return null;
 +
 +                info = dictMgr.getDictionaryInfo(dictResPath);
 +                if (info == null)
 +                    throw new IllegalStateException("No dictionary found by " 
+ dictResPath
 +                            + ", invalid cube state; cube segment" + cubeSeg 
+ ", col " + col);
 +            } catch (IOException e) {
 +                throw new IllegalStateException("Failed to get dictionary for 
cube segment" + cubeSeg + ", col" + col,
 +                        e);
              }
 +            return (Dictionary<String>) info.getDictionaryObject();
          }
  
 -        return uhcIndex;
 -    }
 +        public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String 
lookupTable) throws IOException {
 +            // work on copy instead of cached objects
 +            CubeInstance cubeCopy = 
cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
 +            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
  
 -    /**
 -     * Calculate the holes (gaps) in segments.
 -     * @param cubeName
 -     * @return
 -     */
 -    public List<CubeSegment> calculateHoles(String cubeName) {
 -        List<CubeSegment> holes = Lists.newArrayList();
 -        final CubeInstance cube = getCube(cubeName);
 -        Preconditions.checkNotNull(cube);
 -        final List<CubeSegment> segments = cube.getSegments();
 -        logger.info("totally " + segments.size() + " cubeSegments");
 -        if (segments.size() == 0) {
 -            return holes;
 +            TableMetadataManager metaMgr = getTableManager();
 +            SnapshotManager snapshotMgr = getSnapshotManager();
 +
 +            TableDesc tableDesc = new 
TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
 +            IReadableTable hiveTable = 
SourceFactory.createReadableTable(tableDesc);
 +            SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, 
tableDesc);
 +
 +            segCopy.putSnapshotResPath(lookupTable, 
snapshot.getResourcePath());
 +            CubeUpdate update = new CubeUpdate(cubeCopy);
 +            update.setToUpdateSegs(segCopy);
 +            updateCube(update);
 +
 +            return snapshot;
          }
  
 -        Collections.sort(segments);
 -        for (int i = 0; i < segments.size() - 1; ++i) {
 -            CubeSegment first = segments.get(i);
 -            CubeSegment second = segments.get(i + 1);
 -            if (first.getSegRange().connects(second.getSegRange()))
 -                continue;
 -            
 -            if (first.getSegRange().apartBefore(second.getSegRange())) {
 -                CubeSegment hole = new CubeSegment();
 -                hole.setCubeInstance(cube);
 -                if (first.isOffsetCube()) {
 -                    hole.setSegRange(new 
SegmentRange(first.getSegRange().end, second.getSegRange().start));
 -                    
hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd());
 -                    
hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart());
 -                    hole.setName(CubeSegment.makeSegmentName(null, 
hole.getSegRange()));
 -                } else {
 -                    hole.setTSRange(new TSRange(first.getTSRange().end.v, 
second.getTSRange().start.v));
 -                    
hole.setName(CubeSegment.makeSegmentName(hole.getTSRange(), null));
 -                }
 -                holes.add(hole);
 +        public LookupStringTable getLookupTable(CubeSegment cubeSegment, 
JoinDesc join) {
 +
 +            String tableName = join.getPKSide().getTableIdentity();
 +            String[] pkCols = join.getPrimaryKey();
 +            String snapshotResPath = 
cubeSegment.getSnapshotResPath(tableName);
 +            if (snapshotResPath == null)
 +                throw new IllegalStateException("No snapshot for table '" + 
tableName + "' found on cube segment"
 +                        + cubeSegment.getCubeInstance().getName() + "/" + 
cubeSegment);
 +
 +            try {
 +                SnapshotTable snapshot = 
getSnapshotManager().getSnapshotTable(snapshotResPath);
 +                TableDesc tableDesc = 
getTableManager().getTableDesc(tableName, cubeSegment.getProject());
 +                return new LookupStringTable(tableDesc, pkCols, snapshot);
 +            } catch (IOException e) {
 +                throw new IllegalStateException(
 +                        "Failed to load lookup table " + tableName + " from 
snapshot " + snapshotResPath, e);
              }
          }
- 
-         private final String GLOBAL_DICTIONNARY_CLASS = 
"org.apache.kylin.dict.GlobalDictionaryBuilder";
- 
-         //UHC (ultra high cardinality column): contain the ShardByColumns and 
the GlobalDictionaryColumns
-         public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
-             List<TblColRef> dictCols = 
Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
-             int[] uhcIndex = new int[dictCols.size()];
- 
-             //add GlobalDictionaryColumns
-             List<DictionaryDesc> dictionaryDescList = 
cubeDesc.getDictionaries();
-             if (dictionaryDescList != null) {
-                 for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
-                     if (dictionaryDesc.getBuilderClass() != null
-                             && 
dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) {
-                         for (int i = 0; i < dictCols.size(); i++) {
-                             if 
(dictCols.get(i).equals(dictionaryDesc.getColumnRef())) {
-                                 uhcIndex[i] = 1;
-                                 break;
-                             }
-                         }
-                     }
-                 }
-             }
- 
-             //add ShardByColumns
-             Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns();
-             for (int i = 0; i < dictCols.size(); i++) {
-                 if (shardByColumns.contains(dictCols.get(i))) {
-                     uhcIndex[i] = 1;
-                 }
-             }
- 
-             return uhcIndex;
-         }
 -        return holes;
      }
  
  }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
----------------------------------------------------------------------
diff --cc core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
index 1d4e722,2e1d652..378d082
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
@@@ -34,9 -35,10 +35,10 @@@ public class CubeUpdate 
      private String owner;
      private int cost = -1;
      private Map<Long, Long> cuboids = null;
+     private Set<Long> cuboidsRecommend = null;
  
      public CubeUpdate(CubeInstance cubeInstance) {
 -        this.cubeInstance = cubeInstance;
 +        setCubeInstance(cubeInstance);
      }
  
      public CubeInstance getCubeInstance() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --cc core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index 1110a5d,d9c7803..7d332bb
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@@ -52,16 -56,44 +52,22 @@@ public class Cuboid implements Comparab
          }
      };
  
 -    // this is the only entry point for query to find the right cuboid for a 
segment
 -    public static Cuboid identifyCuboid(CubeSegment cubeSegment, 
Set<TblColRef> dimensions,
 -            Collection<FunctionDesc> metrics) {
 -        return identifyCuboid(cubeSegment.getCuboidScheduler(), dimensions, 
metrics);
 -    }
 -
 -    // this is the only entry point for query to find the right cuboid for a 
cube instance
 -    public static Cuboid identifyCuboid(CubeInstance cubeInstance, 
Set<TblColRef> dimensions,
 -            Collection<FunctionDesc> metrics) {
 -        return identifyCuboid(cubeInstance.getCuboidScheduler(), dimensions, 
metrics);
++    // for mandatory cuboid, no need to translate cuboid
++    public static Cuboid findForMandatory(CubeDesc cube, long cuboidID) {
++        return new Cuboid(cube, cuboidID, cuboidID);
+     }
 -
 -    public static Cuboid identifyCuboid(CuboidScheduler cuboidScheduler, 
Set<TblColRef> dimensions,
++    
 +    public static Cuboid findCuboid(CuboidScheduler cuboidScheduler, 
Set<TblColRef> dimensions,
              Collection<FunctionDesc> metrics) {
 -        long cuboidID = identifyCuboidId(cuboidScheduler.getCubeDesc(), 
dimensions, metrics);
 +        long cuboidID = toCuboidId(cuboidScheduler.getCubeDesc(), dimensions, 
metrics);
          return Cuboid.findById(cuboidScheduler, cuboidID);
      }
  
 -    public static long identifyCuboidId(CubeDesc cubeDesc, Set<TblColRef> 
dimensions, Collection<FunctionDesc> metrics) {
 -        for (FunctionDesc metric : metrics) {
 -            if (metric.getMeasureType().onlyAggrInBaseCuboid())
 -                return Cuboid.getBaseCuboidId(cubeDesc);
 -        }
 -
 -        long cuboidID = 0;
 -        for (TblColRef column : dimensions) {
 -            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
 -            cuboidID |= 1L << index;
 -        }
 -        return cuboidID;
 -    }
 -
 -    // for mandatory cuboid, no need to translate cuboid
 -    public static Cuboid findForMandatory(CubeDesc cube, long cuboidID) {
 -        return new Cuboid(cube, cuboidID, cuboidID);
 +    public static Cuboid findById(CuboidScheduler cuboidScheduler, byte[] 
cuboidID) {
 +        return findById(cuboidScheduler, Bytes.toLong(cuboidID));
      }
  
+     @Deprecated
      public static Cuboid findById(CubeSegment cubeSegment, long cuboidID) {
          return findById(cubeSegment.getCuboidScheduler(), cuboidID);
      }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --cc core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 32578ed,c4e1ced..88a8e43
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@@ -54,6 -54,6 +55,8 @@@ import org.apache.kylin.common.util.Arr
  import org.apache.kylin.common.util.JsonUtil;
  import org.apache.kylin.common.util.Pair;
  import org.apache.kylin.cube.cuboid.CuboidScheduler;
++import org.apache.kylin.dict.GlobalDictionaryBuilder;
++import org.apache.kylin.dict.global.SegmentAppendTrieDictBuilder;
  import org.apache.kylin.measure.MeasureType;
  import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType;
  import org.apache.kylin.metadata.MetadataConstants;
@@@ -191,12 -186,15 +196,17 @@@ public class CubeDesc extends RootPersi
      @JsonInclude(JsonInclude.Include.NON_NULL)
      private int parentForward = 3;
  
+     @JsonProperty("mandatory_dimension_set_list")
+     @JsonInclude(JsonInclude.Include.NON_NULL)
+     private List<Set<String>> mandatoryDimensionSetList = 
Collections.emptyList();
+ 
 -    private Set<Long> mandatoryCuboids = Sets.newHashSet();
 +    // Error messages during resolving json metadata
 +    private List<String> errors = new ArrayList<String>();
  
      private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<>();
      private LinkedHashSet<ColumnDesc> allColumnDescs = new LinkedHashSet<>();
      private LinkedHashSet<TblColRef> dimensionColumns = new LinkedHashSet<>();
++    private Set<Long> mandatoryCuboids = new HashSet<>();
  
      private Map<TblColRef, DeriveInfo> derivedToHostMap = Maps.newHashMap();
      private Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedMap = 
Maps.newHashMap();
@@@ -1333,6 -1391,6 +1410,31 @@@
          }
          return null;
      }
++    
++    public List<TblColRef> getAllGlobalDictColumns() {
++        List<TblColRef> globalDictCols = new ArrayList<TblColRef>();
++        List<DictionaryDesc> dictionaryDescList = getDictionaries();
++
++        if (dictionaryDescList == null) {
++            return globalDictCols;
++        }
++
++        for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
++            String cls = dictionaryDesc.getBuilderClass();
++            if (GlobalDictionaryBuilder.class.getName().equals(cls) || 
SegmentAppendTrieDictBuilder.class.getName().equals(cls))
++                globalDictCols.add(dictionaryDesc.getColumnRef());
++        }
++        return globalDictCols;
++    }
++    
++    // UHC (ultra high cardinality column): contain the ShardByColumns and 
the GlobalDictionaryColumns
++    public List<TblColRef> getAllUHCColumns() {
++        List<TblColRef> uhcColumns = new ArrayList<TblColRef>();
++        uhcColumns.addAll(getAllGlobalDictColumns());
++        uhcColumns.addAll(getShardByColumns());
++        return uhcColumns;
++    }
++
  
      public String getProject() {
          return getModel().getProject();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/test/java/org/apache/kylin/cube/CubeDescTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --cc core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index f293472,03dd928..8953868
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@@ -308,6 -306,90 +309,82 @@@ public class CubeManagerTest extends Lo
      }
  
      @Test
+     public void testAutoMergeWithVolatileRange() throws Exception {
+         CubeManager mgr = CubeManager.getInstance(getTestConfig());
+         CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+ 
 -        cube.getDescriptor().setAutoMergeTimeRanges(new long[] { 2000, 6000 
});
 -
 -        mgr.updateCube(new CubeUpdate(cube));
++        CubeDesc desc = cube.getDescriptor();
++        desc.setAutoMergeTimeRanges(new long[] { 2000, 6000 });
++        CubeDescManager.getInstance(getTestConfig()).updateCubeDesc(desc);
+ 
++        cube = mgr.getCube(cube.getName());
+         assertTrue(cube.needAutoMerge());
+ 
+         // no segment at first
+         assertEquals(0, cube.getSegments().size());
+ 
+         // append first
+         CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L));
 -        seg1.setStatus(SegmentStatusEnum.READY);
++        mgr.updateCubeSegStatus(seg1, SegmentStatusEnum.READY);
+ 
+         CubeSegment seg3 = mgr.appendSegment(cube, new TSRange(2000L, 4000L));
 -        seg3.setStatus(SegmentStatusEnum.READY);
++        mgr.updateCubeSegStatus(seg3, SegmentStatusEnum.READY);
+ 
++        cube = mgr.getCube(cube.getName());
+         assertEquals(2, cube.getSegments().size());
+ 
+         SegmentRange mergedSeg = cube.autoMergeCubeSegments();
 -
+         assertTrue(mergedSeg == null);
+ 
+         assertEquals(2, cube.getSegments().size());
+ 
+         // append a new seg
 -
+         CubeSegment seg4 = mgr.appendSegment(cube, new TSRange(4000L, 8000L));
 -        seg4.setStatus(SegmentStatusEnum.READY);
++        mgr.updateCubeSegStatus(seg4, SegmentStatusEnum.READY);
+ 
++        cube = mgr.getCube(cube.getName());
+         assertEquals(3, cube.getSegments().size());
+ 
+         cube.getDescriptor().setVolatileRange(10000);
+ 
+         mergedSeg = cube.autoMergeCubeSegments();
 -
+         assertTrue(mergedSeg == null);
+ 
+         //will merge after change the volatile_range
+ 
+         cube.getDescriptor().setVolatileRange(0);
+ 
+         mergedSeg = cube.autoMergeCubeSegments();
 -
+         assertTrue(mergedSeg != null);
 -
+         assertTrue((Long) mergedSeg.start.v == 2000 && (Long) mergedSeg.end.v 
== 8000);
+ 
+         // fill the gap
 -
+         CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L));
 -        seg2.setStatus(SegmentStatusEnum.READY);
++        mgr.updateCubeSegStatus(seg2, SegmentStatusEnum.READY);
+ 
++        cube = mgr.getCube(cube.getName());
+         assertEquals(4, cube.getSegments().size());
+ 
+         cube.getDescriptor().setVolatileRange(10000);
+ 
+         mergedSeg = cube.autoMergeCubeSegments();
 -
+         assertTrue(mergedSeg == null);
+ 
+         //will merge after change the volatile_range
+         cube.getDescriptor().setVolatileRange(0);
+ 
+         mergedSeg = cube.autoMergeCubeSegments();
 -
+         assertTrue(mergedSeg != null);
 -
+         assertTrue((Long) mergedSeg.start.v == 0 && (Long) mergedSeg.end.v == 
8000);
+ 
+         cube.getDescriptor().setVolatileRange(1000);
+ 
+         mergedSeg = cube.autoMergeCubeSegments();
 -
+         assertTrue(mergedSeg != null);
 -
+         assertTrue((Long) mergedSeg.start.v == 0 && (Long) mergedSeg.end.v == 
2000);
 -
+     }
+ 
+     @Test
      public void testGetCubeNameWithNamespace() {
          System.setProperty("kylin.storage.hbase.table-name-prefix", "HELLO_");
          try {
@@@ -317,8 -399,43 +394,40 @@@
          } finally {
              System.clearProperty("kylin.storage.hbase.table-name-prefix");
          }
+ 
+         System.setProperty("kylin.storage.hbase.namespace", "MYSPACE");
+         try {
+             CubeManager mgr = CubeManager.getInstance(getTestConfig());
+             String tablename = mgr.generateStorageLocation();
+             assertTrue(tablename.startsWith("MYSPACE:"));
+         } finally {
+             System.clearProperty("kylin.storage.hbase.namespace");
+         }
      }
  
+     @Test
+     public void testBuildCubeWithPartitionStartDate() throws IOException {
+         Long PARTITION_DATE_START = 1513123200L;
+         Long FIRST_BUILD_DATE_END = 1514764800L;
+         Long SECOND_BUILD_DATE_END = 1540339200L;
+ 
+         KylinConfig config = getTestConfig();
+         CubeManager cubeManager = CubeManager.getInstance(config);
+         CubeInstance cube = 
cubeManager.getCube("test_kylin_cube_with_slr_empty");
+         cube.getDescriptor().setPartitionDateStart(PARTITION_DATE_START);
+ 
+         CubeSegment segment = cubeManager.appendSegment(cube, new TSRange(0L, 
FIRST_BUILD_DATE_END), null, null, null);
+         assertEquals(segment._getDateRangeStart(), 
PARTITION_DATE_START.longValue());
+         assertEquals(segment._getDateRangeEnd(), 
FIRST_BUILD_DATE_END.longValue());
+ 
 -        segment.setStatus(SegmentStatusEnum.READY);
 -        CubeUpdate cubeBuilder = new CubeUpdate(cube);
 -        cubeManager.updateCube(cubeBuilder);
++        cubeManager.updateCubeSegStatus(segment, SegmentStatusEnum.READY);
+ 
+         segment = cubeManager.appendSegment(cube, new TSRange(0L, 
SECOND_BUILD_DATE_END), null, null, null);
+         assertEquals(segment._getDateRangeStart(), 
FIRST_BUILD_DATE_END.longValue());
+         assertEquals(segment._getDateRangeEnd(), 
SECOND_BUILD_DATE_END.longValue());
 -
+     }
+ 
+ 
      public CubeDescManager getCubeDescManager() {
          return CubeDescManager.getInstance(getTestConfig());
      }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --cc 
core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index 404d53c,5717542..fd7affd
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@@ -42,8 -42,7 +42,8 @@@ public class GlobalDictionaryBuilder im
  
      private static Logger logger = 
LoggerFactory.getLogger(GlobalDictionaryBuilder.class);
  
 +    @Override
-     public void init(DictionaryInfo dictInfo, int baseId) throws IOException {
+     public void init(DictionaryInfo dictInfo, int baseId, String hdfsDir) 
throws IOException {
          sourceColumn = dictInfo.getSourceTable() + "_" + 
dictInfo.getSourceColumn();
          lock = 
KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
          lock.lock(getLockPath(sourceColumn), Long.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
----------------------------------------------------------------------
diff --cc 
core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
index 0934a7d,18bbb07..e2a643d
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
@@@ -28,7 -28,7 +28,7 @@@ import org.apache.kylin.common.util.Dic
  public interface IDictionaryBuilder {
  
      /** Sets the dictionary info for the dictionary being built. Mainly for 
GlobalDictionaryBuilder. */
-     void init(DictionaryInfo info, int baseId) throws IOException;
 -    void init(DictionaryInfo info, int baseId, String hdfsDir) throws 
IOException;
++    void init(DictionaryInfo info, int baseId, String hdfsWorkingDir) throws 
IOException;
      
      /** Add a new value into dictionary, returns it is accepted (not null) or 
not. */
      boolean addValue(String value);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-dictionary/src/main/java/org/apache/kylin/dict/Number2BytesConverter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java
----------------------------------------------------------------------
diff --cc 
core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java
index c8bc13d,e5f2d57..cfc1f98
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java
@@@ -43,9 -43,9 +43,14 @@@ public class SegmentAppendTrieDictBuild
  
          KylinConfig config = KylinConfig.getInstanceFromEnv();
          int maxEntriesPerSlice = config.getAppendDictEntrySize();
++        if (hdfsDir == null) {
++            //build in Kylin job server
++            hdfsDir = 
KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
++        }
++
          //use UUID to make each segment dict in different HDFS dir and 
support concurrent build
          //use timestamp to make the segment dict easily to delete
-         String baseDir = config.getHdfsWorkingDirectory() + 
"resources/SegmentDict" + dictInfo.getResourceDir() + "/" + 
UUID.randomUUID().toString() + "_" + System.currentTimeMillis()+ "/";
+         String baseDir = hdfsDir + "resources/SegmentDict" + 
dictInfo.getResourceDir() + "/" + UUID.randomUUID().toString() + "_" + 
System.currentTimeMillis()+ "/";
  
          this.builder = new AppendTrieDictionaryBuilder(baseDir, 
maxEntriesPerSlice, false);
          this.baseId = baseId;

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --cc core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 27678ac,316fc99..7b8e413
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@@ -65,7 -67,7 +67,7 @@@ public class JoinedFlatTable 
      }
  
      public static String generateCreateTableStatement(IJoinedFlatTableDesc 
flatDesc, String storageDfsDir,
-             String format, String fieldDelimiter) {
 -            String storageFormat, String filedDelimiter) {
++            String storageFormat, String fieldDelimiter) {
          StringBuilder ddl = new StringBuilder();
  
          ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + 
flatDesc.getTableName() + "\n");
@@@ -79,11 -81,12 +81,12 @@@
              ddl.append(colName(col) + " " + 
getHiveDataType(col.getDatatype()) + "\n");
          }
          ddl.append(")" + "\n");
-         if ("TEXTFILE".equals(format)) {
+         if ("TEXTFILE".equals(storageFormat)) {
 -            ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '" + 
filedDelimiter + "'\n");
 +            ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '" + 
fieldDelimiter + "'\n");
          }
-         ddl.append("STORED AS " + format + "\n");
+         ddl.append("STORED AS " + storageFormat + "\n");
          ddl.append("LOCATION '" + getTableDir(flatDesc, storageDfsDir) + 
"';").append("\n");
+         ddl.append("ALTER TABLE " + flatDesc.getTableName() + " SET 
TBLPROPERTIES('auto.purge'='true');\n");
          return ddl.toString();
      }
  

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --cc 
core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 8795e4c,9e53459..404db54
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@@ -89,13 -90,13 +91,13 @@@ public class DefaultChainedExecutable e
      @Override
      protected void onExecuteFinished(ExecuteResult result, ExecutableContext 
executableContext) {
          ExecutableManager mgr = getManager();
 -        
 +
          if (isDiscarded()) {
              setEndTime(System.currentTimeMillis());
-             notifyUserStatusChange(executableContext, 
ExecutableState.DISCARDED);
+             onStatusChange(executableContext, result, 
ExecutableState.DISCARDED);
          } else if (isPaused()) {
              setEndTime(System.currentTimeMillis());
-             notifyUserStatusChange(executableContext, 
ExecutableState.STOPPED);
+             onStatusChange(executableContext, result, 
ExecutableState.STOPPED);
          } else if (result.succeed()) {
              List<? extends Executable> jobs = getTasks();
              boolean allSucceed = true;
@@@ -131,7 -125,9 +133,7 @@@
              } else if (hasError) {
                  setEndTime(System.currentTimeMillis());
                  mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, 
null);
-                 notifyUserStatusChange(executableContext, 
ExecutableState.ERROR);
+                 onStatusChange(executableContext, result, 
ExecutableState.ERROR);
 -            } else if (hasRunning) {
 -                mgr.updateJobOutput(getId(), ExecutableState.RUNNING, null, 
null);
              } else if (hasDiscarded) {
                  setEndTime(System.currentTimeMillis());
                  mgr.updateJobOutput(getId(), ExecutableState.DISCARDED, null, 
null);
@@@ -145,9 -141,8 +147,8 @@@
          }
      }
  
-     @Override
-     public List<AbstractExecutable> getTasks() {
-         return subTasks;
+     protected void onStatusChange(ExecutableContext context, ExecuteResult 
result, ExecutableState state) {
 -        notifyUserStatusChange(context, state);
++        super.notifyUserStatusChange(context, state);
      }
  
      @Override
@@@ -170,31 -165,8 +171,36 @@@
          this.subTasks.add(executable);
      }
  
 +    private boolean retryFetchTaskStatus(Executable task) {
 +        boolean hasRunning = false;
 +        int retry = 1;
 +        while (retry <= 10) {
 +            ExecutableState retryState = task.getStatus();
 +            if (retryState == ExecutableState.RUNNING) {
 +                try {
 +                    Thread.sleep(100);
 +                } catch (InterruptedException e) {
 +                    logger.error("Failed to Sleep: ", e);
 +                }
 +                hasRunning = true;
 +                logger.error("With {} times retry, it's state is still 
RUNNING", retry);
 +            } else {
 +                logger.info("With {} times retry, status is changed to: {}", 
retry, retryState);
 +                hasRunning = false;
 +                break;
 +            }
 +            retry++;
 +        }
 +        if (hasRunning) {
 +            logger.error("Parent task: {} is finished, but it's subtask: {}'s 
state is still RUNNING \n"
 +                    + ", mark parent task failed.", getName(), 
task.getName());
 +            return false;
 +        }
 +        return true;
 +    }
++    
+     @Override
+     public int getDefaultPriority() {
+         return DEFAULT_PRIORITY;
+     }
  }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --cc 
core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 69d85ca,287f215..6af0192
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@@ -49,9 -52,9 +52,39 @@@ import com.google.common.collect.Maps
   */
  public class DefaultScheduler implements Scheduler<AbstractExecutable>, 
ConnectionStateListener {
  
++    private static DefaultScheduler INSTANCE = null;
++
++    public static DefaultScheduler getInstance() {
++        if (INSTANCE == null) {
++            INSTANCE = createInstance();
++        }
++        return INSTANCE;
++    }
++    
++    public synchronized static DefaultScheduler createInstance() {
++        destroyInstance();
++        INSTANCE = new DefaultScheduler();
++        return INSTANCE;
++    }
++
++    public synchronized static void destroyInstance() {
++        DefaultScheduler tmp = INSTANCE;
++        INSTANCE = null;
++        if (tmp != null) {
++            try {
++                tmp.shutdown();
++            } catch (SchedulerException e) {
++                logger.error("error stop DefaultScheduler", e);
++                throw new RuntimeException(e);
++            }
++        }
++    }
++
++    // 
============================================================================
++    
      private JobLock jobLock;
      private ExecutableManager executableManager;
-     private FetcherRunner fetcher;
+     private Runnable fetcher;
      private ScheduledExecutorService fetcherPool;
      private ExecutorService jobPool;
      private DefaultContext context;
@@@ -59,11 -62,10 +92,9 @@@
      private static final Logger logger = 
LoggerFactory.getLogger(DefaultScheduler.class);
      private volatile boolean initialized = false;
      private volatile boolean hasStarted = false;
 +    volatile boolean fetchFailed = false;
      private JobEngineConfig jobEngineConfig;
  
--    private static DefaultScheduler INSTANCE = null;
--
      public DefaultScheduler() {
          if (INSTANCE != null) {
              throw new IllegalStateException("DefaultScheduler has been 
initiated.");
@@@ -195,25 -275,25 +322,6 @@@
          }
      }
  
--    public synchronized static DefaultScheduler createInstance() {
--        destroyInstance();
--        INSTANCE = new DefaultScheduler();
--        return INSTANCE;
--    }
--
--    public synchronized static void destroyInstance() {
--        DefaultScheduler tmp = INSTANCE;
--        INSTANCE = null;
--        if (tmp != null) {
--            try {
--                tmp.shutdown();
--            } catch (SchedulerException e) {
--                logger.error("error stop DefaultScheduler", e);
--                throw new RuntimeException(e);
--            }
--        }
--    }
--
      @Override
      public synchronized void init(JobEngineConfig jobEngineConfig, JobLock 
lock) throws SchedulerException {
          jobLock = lock;

Reply via email to