http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --cc core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 17e3aa5,e5ec2d7..1fbbbe6 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@@ -43,7 -45,7 +43,6 @@@ import org.apache.kylin.common.util.Dic import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; --import org.apache.kylin.cube.model.DictionaryDesc; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.dict.lookup.LookupStringTable; @@@ -378,632 -445,555 +378,694 @@@ public class CubeManager implements IRe return cube; } - // append a full build segment - public CubeSegment appendSegment(CubeInstance cube) throws IOException { - return appendSegment(cube, null, null, null, null); - } - - public CubeSegment appendSegment(CubeInstance cube, TSRange tsRange) throws IOException { - return appendSegment(cube, tsRange, null, null, null); + // for test + CubeInstance reloadCube(String cubeName) { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + return crud.reload(cubeName); + } } - public CubeSegment appendSegment(CubeInstance cube, SourcePartition src) throws IOException { - return appendSegment(cube, src.getTSRange(), src.getSegRange(), src.getSourcePartitionOffsetStart(), - src.getSourcePartitionOffsetEnd()); + // for internal + CubeInstance reloadCubeQuietly(String cubeName) { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + CubeInstance cube = crud.reloadQuietly(cubeName); + if (cube != null) + Cuboid.clearCache(cube); + return cube; + } } - CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange, - Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) - throws IOException { - checkInputRanges(tsRange, segRange); - checkBuildingSegment(cube); - - // fix start/end a bit - if (cube.getModel().getPartitionDesc().isPartitioned()) { - // if missing start, set it to where last time ends - CubeSegment last = cube.getLastSegment(); - CubeDesc cubeDesc = cube.getDescriptor(); - if (tsRange != null && tsRange.start.v == 0) { - if (last == null) { - tsRange = new TSRange(cubeDesc.getPartitionDateStart(), tsRange.end.v); - } else if (!last.isOffsetCube()) { - tsRange = new TSRange(last.getTSRange().end.v, tsRange.end.v); + public void removeCubeLocal(String cubeName) { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + CubeInstance cube = cubeMap.get(cubeName); + if (cube != null) { + cubeMap.removeLocal(cubeName); + for (CubeSegment segment : cube.getSegments()) { + usedStorageLocation.remove(segment.getUuid()); } + Cuboid.clearCache(cube); } - } else { - // full build - tsRange = null; - segRange = null; } + } - CubeSegment newSegment = newSegment(cube, tsRange, segRange); - newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart); - newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd); - validateNewSegments(cube, newSegment); + public CubeInstance dropCube(String cubeName, boolean deleteDesc) throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + logger.info("Dropping cube '" + cubeName + "'"); + // load projects before remove cube from project - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToAddSegs(newSegment); - updateCube(cubeBuilder); - return newSegment; - } + // delete cube instance and cube desc + CubeInstance cube = getCube(cubeName); - public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) throws IOException { - checkInputRanges(tsRange, segRange); - checkBuildingSegment(cube); - - CubeSegment newSegment = newSegment(cube, tsRange, segRange); - - Pair<Boolean, Boolean> pair = cube.getSegments().fitInSegments(newSegment); - if (pair.getFirst() == false || pair.getSecond() == false) - throw new IllegalArgumentException("The new refreshing segment " + newSegment - + " does not match any existing segment in cube " + cube); - - if (segRange != null) { - CubeSegment toRefreshSeg = null; - for (CubeSegment cubeSegment : cube.getSegments()) { - if (cubeSegment.getSegRange().equals(segRange)) { - toRefreshSeg = cubeSegment; - break; - } - } + // remove cube and update cache + crud.delete(cube); + Cuboid.clearCache(cube); - if (toRefreshSeg == null) { - throw new IllegalArgumentException("For streaming cube, only one segment can be refreshed at one time"); + if (deleteDesc && cube.getDescriptor() != null) { + CubeDescManager.getInstance(config).removeCubeDesc(cube.getDescriptor()); } - newSegment.setSourcePartitionOffsetStart(toRefreshSeg.getSourcePartitionOffsetStart()); - newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd()); + // delete cube from project + ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName); + + return cube; } + } - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToAddSegs(newSegment); - updateCube(cubeBuilder); + @VisibleForTesting + /*private*/ String generateStorageLocation() { + String namePrefix = config.getHBaseTableNamePrefix(); ++ String namespace = config.getHBaseStorageNameSpace(); + String tableName = ""; + Random ran = new Random(); + do { + StringBuffer sb = new StringBuffer(); ++ if ((namespace.equals("default") || namespace.equals("")) == false) { ++ sb.append(namespace).append(":"); ++ } + sb.append(namePrefix); + for (int i = 0; i < HBASE_TABLE_LENGTH; i++) { + sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length()))); + } + tableName = sb.toString(); + } while (this.usedStorageLocation.containsValue(tableName)); + return tableName; + } - return newSegment; + public CubeInstance copyForWrite(CubeInstance cube) { + return crud.copyForWrite(cube); } - public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> cuboidsRecommend) throws IOException { - checkReadyForOptimize(cube); + private boolean isReady(CubeSegment seg) { + return seg.getStatus() == SegmentStatusEnum.READY; + } - List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY); - CubeSegment[] optimizeSegments = new CubeSegment[readySegments.size()]; - int i = 0; - for (CubeSegment segment : readySegments) { - CubeSegment newSegment = newSegment(cube, segment.getTSRange(), null); - validateNewSegments(cube, newSegment); + private TableMetadataManager getTableManager() { + return TableMetadataManager.getInstance(config); + } - optimizeSegments[i++] = newSegment; - } + private DictionaryManager getDictionaryManager() { + return DictionaryManager.getInstance(config); + } - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setCuboidsRecommend(cuboidsRecommend); - cubeBuilder.setToAddSegs(optimizeSegments); - updateCube(cubeBuilder); + private SnapshotManager getSnapshotManager() { + return SnapshotManager.getInstance(config); + } - return optimizeSegments; + private ResourceStore getStore() { + return ResourceStore.getStore(this.config); } - public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force) - throws IOException { - if (cube.getSegments().isEmpty()) - throw new IllegalArgumentException("Cube " + cube + " has no segments"); - - checkInputRanges(tsRange, segRange); - checkBuildingSegment(cube); - checkCubeIsPartitioned(cube); - - if (cube.getSegments().getFirstSegment().isOffsetCube()) { - // offset cube, merge by date range? - if (segRange == null && tsRange != null) { - Pair<CubeSegment, CubeSegment> pair = cube.getSegments(SegmentStatusEnum.READY) - .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE); - if (pair == null) - throw new IllegalArgumentException("Find no segments to merge by " + tsRange + " for cube " + cube); - segRange = new SegmentRange(pair.getFirst().getSegRange().start, pair.getSecond().getSegRange().end); - } - tsRange = null; - Preconditions.checkArgument(segRange != null); - } else { - segRange = null; - Preconditions.checkArgument(tsRange != null); - } - - CubeSegment newSegment = newSegment(cube, tsRange, segRange); - - Segments<CubeSegment> mergingSegments = cube.getMergingSegments(newSegment); - if (mergingSegments.size() <= 1) - throw new IllegalArgumentException("Range " + newSegment.getSegRange() - + " must contain at least 2 segments, but there is " + mergingSegments.size()); - - CubeSegment first = mergingSegments.get(0); - CubeSegment last = mergingSegments.get(mergingSegments.size() - 1); - if (first.isOffsetCube()) { - newSegment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end)); - newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart()); - newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd()); - newSegment.setTSRange(null); - } else { - newSegment.setTSRange(new TSRange(mergingSegments.getTSStart(), mergingSegments.getTSEnd())); - newSegment.setSegRange(null); - } - - if (force == false) { - List<String> emptySegment = Lists.newArrayList(); - for (CubeSegment seg : mergingSegments) { - if (seg.getSizeKB() == 0) { - emptySegment.add(seg.getName()); - } - } + @Override + public RealizationType getRealizationType() { + return RealizationType.CUBE; + } - if (emptySegment.size() > 0) { - throw new IllegalArgumentException( - "Empty cube segment found, couldn't merge unless 'forceMergeEmptySegment' set to true: " - + emptySegment); - } - } + @Override + public IRealization getRealization(String name) { + return getCube(name); + } - validateNewSegments(cube, newSegment); + // ============================================================================ + // Segment related methods + // ============================================================================ - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToAddSegs(newSegment); - updateCube(cubeBuilder); + // append a full build segment + public CubeSegment appendSegment(CubeInstance cube) throws IOException { + return appendSegment(cube, null, null, null, null); + } - return newSegment; + public CubeSegment appendSegment(CubeInstance cube, TSRange tsRange) throws IOException { + return appendSegment(cube, tsRange, null, null, null); } - - private void checkInputRanges(TSRange tsRange, SegmentRange segRange) { - if (tsRange != null && segRange != null) { - throw new IllegalArgumentException("Build or refresh cube segment either by TSRange or by SegmentRange, not both."); + + public CubeSegment appendSegment(CubeInstance cube, SourcePartition src) throws IOException { + return appendSegment(cube, src.getTSRange(), src.getSegRange(), src.getSourcePartitionOffsetStart(), + src.getSourcePartitionOffsetEnd()); + } + + CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange, + Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) + throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + return segAssist.appendSegment(cube, tsRange, segRange, sourcePartitionOffsetStart, + sourcePartitionOffsetEnd); } } - private void checkBuildingSegment(CubeInstance cube) { - checkBuildingSegment(cube, cube.getConfig().getMaxBuildingSegments()); + public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + return segAssist.refreshSegment(cube, tsRange, segRange); + } } - public void checkReadyForOptimize(CubeInstance cube) { - checkBuildingSegment(cube, 1); ++ public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> cuboidsRecommend) throws IOException { ++ try (AutoLock lock = cubeMapLock.lockForWrite()) { ++ return segAssist.optimizeSegments(cube, cuboidsRecommend); ++ } ++ } ++ + public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force) + throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + return segAssist.mergeSegments(cube, tsRange, segRange, force); + } } - private void checkBuildingSegment(CubeInstance cube, int maxBuildingSeg) { - if (cube.getBuildingSegments().size() >= maxBuildingSeg) { - throw new IllegalStateException( - "There is already " + cube.getBuildingSegments().size() + " building segment; "); + public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + segAssist.promoteNewlyBuiltSegments(cube, newSegment); } } - public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) { - segAssist.validateNewSegments(cube, newSegments); - private void checkCubeIsPartitioned(CubeInstance cube) { - if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == false) { - throw new IllegalStateException( - "there is no partition date column specified, only full build is supported"); ++ public void promoteNewlyOptimizeSegments(CubeInstance cube, CubeSegment... optimizedSegments) throws IOException { ++ try (AutoLock lock = cubeMapLock.lockForWrite()) { ++ segAssist.promoteNewlyOptimizeSegments(cube, optimizedSegments); + } } - /** - * After cube update, reload cube related cache - * - * @param cubeName - */ - public CubeInstance reloadCubeLocal(String cubeName) { - CubeInstance cubeInstance = reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName)); - Cuboid.clearCache(cubeInstance); - return cubeInstance; ++ public void promoteCheckpointOptimizeSegments(CubeInstance cube, Map<Long, Long> recommendCuboids, ++ CubeSegment... optimizedSegments) throws IOException { ++ try (AutoLock lock = cubeMapLock.lockForWrite()) { ++ segAssist.promoteCheckpointOptimizeSegments(cube, recommendCuboids, optimizedSegments); ++ } ++ } ++ + public List<CubeSegment> calculateHoles(String cubeName) { + return segAssist.calculateHoles(cubeName); } - public void removeCubeLocal(String cubeName) { - CubeInstance cube = cubeMap.get(cubeName); - if (cube != null) { - cubeMap.removeLocal(cubeName); - for (CubeSegment segment : cube.getSegments()) { - usedStorageLocation.remove(segment.getUuid()); + private class SegmentAssist { + + CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange, + Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) + throws IOException { + CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy + + checkInputRanges(tsRange, segRange); + checkBuildingSegment(cubeCopy); + + // fix start/end a bit + if (cubeCopy.getModel().getPartitionDesc().isPartitioned()) { + // if missing start, set it to where last time ends - CubeSegment last = cubeCopy.getLastSegment(); - if (last != null && !last.isOffsetCube() && tsRange.start.v == 0) { - tsRange = new TSRange(last.getTSRange().end.v, tsRange.end.v); ++ if (tsRange != null && tsRange.start.v == 0) { ++ CubeDesc cubeDesc = cubeCopy.getDescriptor(); ++ CubeSegment last = cubeCopy.getLastSegment(); ++ if (last == null) ++ tsRange = new TSRange(cubeDesc.getPartitionDateStart(), tsRange.end.v); ++ else if (!last.isOffsetCube()) ++ tsRange = new TSRange(last.getTSRange().end.v, tsRange.end.v); + } + } else { + // full build + tsRange = null; + segRange = null; } - Cuboid.clearCache(cube); + + CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange); + newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart); + newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd); + validateNewSegments(cubeCopy, newSegment); + + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToAddSegs(newSegment); + updateCube(update); + return newSegment; } - } - public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) { + public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) + throws IOException { + CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy - String tableName = join.getPKSide().getTableIdentity(); - String[] pkCols = join.getPrimaryKey(); - String snapshotResPath = cubeSegment.getSnapshotResPath(tableName); - if (snapshotResPath == null) - throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment" - + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment); + checkInputRanges(tsRange, segRange); + checkBuildingSegment(cubeCopy); - try { - SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath); - TableDesc tableDesc = getTableManager().getTableDesc(tableName, cubeSegment.getProject()); - return new LookupStringTable(tableDesc, pkCols, snapshot); - } catch (IOException e) { - throw new IllegalStateException( - "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e); + if (cubeCopy.getModel().getPartitionDesc().isPartitioned() == false) { + // full build + tsRange = null; + segRange = null; + } + + CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange); + + Pair<Boolean, Boolean> pair = cubeCopy.getSegments().fitInSegments(newSegment); + if (pair.getFirst() == false || pair.getSecond() == false) + throw new IllegalArgumentException("The new refreshing segment " + newSegment + + " does not match any existing segment in cube " + cubeCopy); + + if (segRange != null) { + CubeSegment toRefreshSeg = null; + for (CubeSegment cubeSegment : cubeCopy.getSegments()) { + if (cubeSegment.getSegRange().equals(segRange)) { + toRefreshSeg = cubeSegment; + break; + } + } + + if (toRefreshSeg == null) { + throw new IllegalArgumentException( + "For streaming cube, only one segment can be refreshed at one time"); + } + + newSegment.setSourcePartitionOffsetStart(toRefreshSeg.getSourcePartitionOffsetStart()); + newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd()); + } + + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToAddSegs(newSegment); + updateCube(update); + + return newSegment; } - } - private CubeSegment newSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) { - CubeSegment segment = new CubeSegment(); - segment.setUuid(UUID.randomUUID().toString()); - segment.setName(CubeSegment.makeSegmentName(tsRange, segRange)); - segment.setCreateTimeUTC(System.currentTimeMillis()); - segment.setCubeInstance(cube); - - // let full build range be backward compatible - if (tsRange == null && segRange == null) - tsRange = new TSRange(0L, Long.MAX_VALUE); - - segment.setTSRange(tsRange); - segment.setSegRange(segRange); - segment.setStatus(SegmentStatusEnum.NEW); - segment.setStorageLocationIdentifier(generateStorageLocation()); ++ public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> cuboidsRecommend) throws IOException { ++ checkReadyForOptimize(cube); + - segment.setCubeInstance(cube); ++ List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY); ++ CubeSegment[] optimizeSegments = new CubeSegment[readySegments.size()]; ++ int i = 0; ++ for (CubeSegment segment : readySegments) { ++ CubeSegment newSegment = newSegment(cube, segment.getTSRange(), null); ++ validateNewSegments(cube, newSegment); + - segment.validate(); - return segment; - } ++ optimizeSegments[i++] = newSegment; ++ } + - @VisibleForTesting - /*private*/ String generateStorageLocation() { - String namePrefix = config.getHBaseTableNamePrefix(); - String namespace = config.getHBaseStorageNameSpace(); - String tableName = ""; - Random ran = new Random(); - do { - StringBuffer sb = new StringBuffer(); - if ((namespace.equals("default") || namespace.equals("")) == false) { - sb.append(namespace).append(":"); ++ CubeUpdate update = new CubeUpdate(cube); ++ update.setCuboidsRecommend(cuboidsRecommend); ++ update.setToAddSegs(optimizeSegments); ++ updateCube(update); ++ ++ return optimizeSegments; ++ } ++ + public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force) + throws IOException { + CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy + + if (cubeCopy.getSegments().isEmpty()) + throw new IllegalArgumentException("Cube " + cubeCopy + " has no segments"); + + checkInputRanges(tsRange, segRange); + checkBuildingSegment(cubeCopy); + checkCubeIsPartitioned(cubeCopy); + + if (cubeCopy.getSegments().getFirstSegment().isOffsetCube()) { + // offset cube, merge by date range? + if (segRange == null && tsRange != null) { + Pair<CubeSegment, CubeSegment> pair = cubeCopy.getSegments(SegmentStatusEnum.READY) + .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE); + if (pair == null) + throw new IllegalArgumentException( + "Find no segments to merge by " + tsRange + " for cube " + cubeCopy); + segRange = new SegmentRange(pair.getFirst().getSegRange().start, + pair.getSecond().getSegRange().end); + } + tsRange = null; + Preconditions.checkArgument(segRange != null); + } else { + segRange = null; + Preconditions.checkArgument(tsRange != null); } - sb.append(namePrefix); - for (int i = 0; i < HBASE_TABLE_LENGTH; i++) { - sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length()))); + + CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange); + + Segments<CubeSegment> mergingSegments = cubeCopy.getMergingSegments(newSegment); + if (mergingSegments.size() <= 1) + throw new IllegalArgumentException("Range " + newSegment.getSegRange() + + " must contain at least 2 segments, but there is " + mergingSegments.size()); + + CubeSegment first = mergingSegments.get(0); + CubeSegment last = mergingSegments.get(mergingSegments.size() - 1); + if (force == false) { + for (int i = 0; i < mergingSegments.size() - 1; i++) { + if (!mergingSegments.get(i).getSegRange().connects(mergingSegments.get(i + 1).getSegRange())) + throw new IllegalStateException("Merging segments must not have gaps between " + + mergingSegments.get(i) + " and " + mergingSegments.get(i + 1)); + } + } + if (first.isOffsetCube()) { + newSegment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end)); + newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart()); + newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd()); + newSegment.setTSRange(null); + } else { + newSegment.setTSRange(new TSRange(mergingSegments.getTSStart(), mergingSegments.getTSEnd())); + newSegment.setSegRange(null); } - tableName = sb.toString(); - } while (this.usedStorageLocation.containsValue(tableName)); - return tableName; - } - public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException { - if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier())) - throw new IllegalStateException( - "For cube " + cube + ", segment " + newSegment + " missing StorageLocationIdentifier"); + if (force == false) { + List<String> emptySegment = Lists.newArrayList(); + for (CubeSegment seg : mergingSegments) { + if (seg.getSizeKB() == 0) { + emptySegment.add(seg.getName()); + } + } - if (StringUtils.isBlank(newSegment.getLastBuildJobID())) - throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " missing LastBuildJobID"); + if (emptySegment.size() > 0) { + throw new IllegalArgumentException( + "Empty cube segment found, couldn't merge unless 'forceMergeEmptySegment' set to true: " + + emptySegment); + } + } - if (isReady(newSegment) == true) { - logger.warn("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY"); - } + validateNewSegments(cubeCopy, newSegment); - List<CubeSegment> tobe = cube.calculateToBeSegments(newSegment); + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToAddSegs(newSegment); + updateCube(update); - if (tobe.contains(newSegment) == false) - throw new IllegalStateException( - "For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe); + return newSegment; + } - newSegment.setStatus(SegmentStatusEnum.READY); + private void checkInputRanges(TSRange tsRange, SegmentRange segRange) { + if (tsRange != null && segRange != null) { + throw new IllegalArgumentException( + "Build or refresh cube segment either by TSRange or by SegmentRange, not both."); + } + } - List<CubeSegment> toRemoveSegs = Lists.newArrayList(); - for (CubeSegment segment : cube.getSegments()) { - if (!tobe.contains(segment)) - toRemoveSegs.add(segment); + private void checkBuildingSegment(CubeInstance cube) { - int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments(); - if (cube.getBuildingSegments().size() >= maxBuldingSeg) { ++ checkBuildingSegment(cube, cube.getConfig().getMaxBuildingSegments()); + } + - logger.info("Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs); ++ private void checkReadyForOptimize(CubeInstance cube) { ++ checkBuildingSegment(cube, 1); ++ } + - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])) - .setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY); - updateCube(cubeBuilder); - } ++ private void checkBuildingSegment(CubeInstance cube, int maxBuildingSeg) { ++ if (cube.getBuildingSegments().size() >= maxBuildingSeg) { + throw new IllegalStateException( + "There is already " + cube.getBuildingSegments().size() + " building segment; "); + } + } - public void promoteNewlyOptimizeSegments(CubeInstance cube, CubeSegment... optimizedSegments) throws IOException { - for (CubeSegment seg : optimizedSegments) { - seg.setStatus(SegmentStatusEnum.READY_PENDING); + private void checkCubeIsPartitioned(CubeInstance cube) { + if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == false) { + throw new IllegalStateException( + "there is no partition date column specified, only full build is supported"); + } } - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToUpdateSegs(optimizedSegments); - updateCube(cubeBuilder); - } + private CubeSegment newSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) { + DataModelDesc modelDesc = cube.getModel(); - public void promoteCheckpointOptimizeSegments(CubeInstance cube, Map<Long, Long> recommendCuboids, - CubeSegment... optimizedSegments) throws IOException { - if (cube.getSegments().size() != optimizedSegments.length * 2) { - throw new IllegalStateException("For cube " + cube - + ", every READY segment should be optimized and all segments should be READY before optimizing"); + CubeSegment segment = new CubeSegment(); + segment.setUuid(UUID.randomUUID().toString()); + segment.setName(CubeSegment.makeSegmentName(tsRange, segRange, modelDesc)); + segment.setCreateTimeUTC(System.currentTimeMillis()); + segment.setCubeInstance(cube); + + // let full build range be backward compatible + if (tsRange == null && segRange == null) + tsRange = new TSRange(0L, Long.MAX_VALUE); + + segment.setTSRange(tsRange); + segment.setSegRange(segRange); + segment.setStatus(SegmentStatusEnum.NEW); + segment.setStorageLocationIdentifier(generateStorageLocation()); + + segment.setCubeInstance(cube); + + segment.validate(); + return segment; } - CubeSegment[] originalSegments = new CubeSegment[optimizedSegments.length]; - int i = 0; - for (CubeSegment seg : optimizedSegments) { - originalSegments[i++] = cube.getOriginalSegmentToOptimize(seg); - if (StringUtils.isBlank(seg.getStorageLocationIdentifier())) + public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException { + // work on copy instead of cached objects + CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy + CubeSegment newSegCopy = cubeCopy.getSegmentById(newSegment.getUuid()); + + if (StringUtils.isBlank(newSegCopy.getStorageLocationIdentifier())) throw new IllegalStateException( - "For cube " + cube + ", segment " + seg + " missing StorageLocationIdentifier"); + "For cube " + cubeCopy + ", segment " + newSegCopy + " missing StorageLocationIdentifier"); - if (StringUtils.isBlank(seg.getLastBuildJobID())) - throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing LastBuildJobID"); + if (StringUtils.isBlank(newSegCopy.getLastBuildJobID())) + throw new IllegalStateException( + "For cube " + cubeCopy + ", segment " + newSegCopy + " missing LastBuildJobID"); - seg.setStatus(SegmentStatusEnum.READY); - } + if (isReady(newSegCopy) == true) { + logger.warn("For cube " + cubeCopy + ", segment " + newSegCopy + " state should be NEW but is READY"); + } - logger.info("Promoting cube " + cube + ", new segments " + Arrays.toString(optimizedSegments) - + ", to remove segments " + originalSegments); + List<CubeSegment> tobe = cubeCopy.calculateToBeSegments(newSegCopy); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(originalSegments) // - .setToUpdateSegs(optimizedSegments) // - .setStatus(RealizationStatusEnum.READY) // - .setCuboids(recommendCuboids) // - .setCuboidsRecommend(Sets.<Long> newHashSet()); - updateCube(cubeBuilder); - } + if (tobe.contains(newSegCopy) == false) + throw new IllegalStateException("For cube " + cubeCopy + ", segment " + newSegCopy + + " is expected but not in the tobe " + tobe); - public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) { - List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments); - List<CubeSegment> newList = Arrays.asList(newSegments); - if (tobe.containsAll(newList) == false) { - throw new IllegalStateException("For cube " + cube + ", the new segments " + newList - + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe); - } - } + newSegCopy.setStatus(SegmentStatusEnum.READY); - private boolean isReady(CubeSegment seg) { - return seg.getStatus() == SegmentStatusEnum.READY; - } + List<CubeSegment> toRemoveSegs = Lists.newArrayList(); + for (CubeSegment segment : cubeCopy.getSegments()) { + if (!tobe.contains(segment)) + toRemoveSegs.add(segment); + } - private void loadAllCubeInstance() throws IOException { - ResourceStore store = getStore(); - List<String> paths = store.collectResourceRecursively(ResourceStore.CUBE_RESOURCE_ROOT, ".json"); + logger.info("Promoting cube " + cubeCopy + ", new segment " + newSegCopy + ", to remove segments " + + toRemoveSegs); - logger.info("Loading Cube from folder " + store.getReadableResourcePath(ResourceStore.CUBE_RESOURCE_ROOT)); + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])) + .setToUpdateSegs(newSegCopy).setStatus(RealizationStatusEnum.READY); + updateCube(update); + } - public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) { - int succeed = 0; - int fail = 0; - for (String path : paths) { - CubeInstance cube = reloadCubeLocalAt(path); - if (cube == null) { - fail++; - } else { - succeed++; ++ public void promoteNewlyOptimizeSegments(CubeInstance cube, CubeSegment... optimizedSegments) throws IOException { ++ CubeInstance cubeCopy = cube.latestCopyForWrite(); ++ CubeSegment[] segCopy = cube.regetSegments(optimizedSegments); ++ ++ for (CubeSegment seg : segCopy) { ++ seg.setStatus(SegmentStatusEnum.READY_PENDING); + } ++ ++ CubeUpdate update = new CubeUpdate(cubeCopy); ++ update.setToUpdateSegs(segCopy); ++ updateCube(update); + } + - logger.info("Loaded " + succeed + " cubes, fail on " + fail + " cubes"); - } ++ public void promoteCheckpointOptimizeSegments(CubeInstance cube, Map<Long, Long> recommendCuboids, ++ CubeSegment... optimizedSegments) throws IOException { ++ CubeInstance cubeCopy = cube.latestCopyForWrite(); ++ CubeSegment[] optSegCopy = cubeCopy.regetSegments(optimizedSegments); ++ ++ if (cubeCopy.getSegments().size() != optSegCopy.length * 2) { ++ throw new IllegalStateException("For cube " + cubeCopy ++ + ", every READY segment should be optimized and all segments should be READY before optimizing"); ++ } ++ ++ CubeSegment[] originalSegments = new CubeSegment[optSegCopy.length]; ++ int i = 0; ++ for (CubeSegment seg : optSegCopy) { ++ originalSegments[i++] = cubeCopy.getOriginalSegmentToOptimize(seg); + - private CubeInstance reloadCubeLocalAt(String path) { - ResourceStore store = getStore(); - CubeInstance cube; ++ if (StringUtils.isBlank(seg.getStorageLocationIdentifier())) ++ throw new IllegalStateException( ++ "For cube " + cubeCopy + ", segment " + seg + " missing StorageLocationIdentifier"); + - try { - cube = store.getResource(path, CubeInstance.class, CUBE_SERIALIZER); - checkNotNull(cube, "cube (at %s) not found", path); - - String cubeName = cube.getName(); - checkState(StringUtils.isNotBlank(cubeName), "cube (at %s) name must not be blank", path); - - CubeDesc cubeDesc = CubeDescManager.getInstance(config).getCubeDesc(cube.getDescName()); - checkNotNull(cubeDesc, "cube descriptor '%s' (for cube '%s') not found", cube.getDescName(), cubeName); - if (!isSpecialTestCube(cubeName)) - checkState(cubeDesc.getName().equals(cubeName), - "cube name '%s' must be same as descriptor name '%s', but it is not", cubeName, - cubeDesc.getName()); - - if (!cubeDesc.getError().isEmpty()) { - cube.setStatus(RealizationStatusEnum.DESCBROKEN); - logger.error("cube descriptor {} (for cube '{}') is broken", cubeDesc.getResourcePath(), cubeName); - for (String error : cubeDesc.getError()) { - logger.error("Error: {}", error); - } - } else if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) { - cube.setStatus(RealizationStatusEnum.DISABLED); - logger.info("cube {} changed from DESCBROKEN to DISABLED", cubeName); ++ if (StringUtils.isBlank(seg.getLastBuildJobID())) ++ throw new IllegalStateException("For cube " + cubeCopy + ", segment " + seg + " missing LastBuildJobID"); ++ ++ seg.setStatus(SegmentStatusEnum.READY); + } + - cube.setConfig((KylinConfigExt) cubeDesc.getConfig()); - cubeMap.putLocal(cubeName, cube); ++ logger.info("Promoting cube " + cubeCopy + ", new segments " + Arrays.toString(optSegCopy) ++ + ", to remove segments " + originalSegments); ++ ++ CubeUpdate update = new CubeUpdate(cubeCopy); ++ update.setToRemoveSegs(originalSegments) // ++ .setToUpdateSegs(optSegCopy) // ++ .setStatus(RealizationStatusEnum.READY) // ++ .setCuboids(recommendCuboids) // ++ .setCuboidsRecommend(Sets.<Long> newHashSet()); ++ updateCube(update); ++ } + - for (CubeSegment segment : cube.getSegments()) { - usedStorageLocation.put(segment.getUuid(), segment.getStorageLocationIdentifier()); ++ private void validateNewSegments(CubeInstance cube, CubeSegment newSegments) { + List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments); + List<CubeSegment> newList = Arrays.asList(newSegments); + if (tobe.containsAll(newList) == false) { + throw new IllegalStateException("For cube " + cube + ", the new segments " + newList + + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe); } + } - logger.info("Reloaded cube {} being {} having {} segments", cubeName, cube, cube.getSegments().size()); - return cube; + /** + * Calculate the holes (gaps) in segments. + * @param cubeName + * @return + */ + public List<CubeSegment> calculateHoles(String cubeName) { + List<CubeSegment> holes = Lists.newArrayList(); + final CubeInstance cube = getCube(cubeName); + DataModelDesc modelDesc = cube.getModel(); + Preconditions.checkNotNull(cube); + final List<CubeSegment> segments = cube.getSegments(); + logger.info("totally " + segments.size() + " cubeSegments"); + if (segments.size() == 0) { + return holes; + } - } catch (Exception e) { - logger.error("Error during load cube instance, skipping : " + path, e); - return null; + Collections.sort(segments); + for (int i = 0; i < segments.size() - 1; ++i) { + CubeSegment first = segments.get(i); + CubeSegment second = segments.get(i + 1); + if (first.getSegRange().connects(second.getSegRange())) + continue; + + if (first.getSegRange().apartBefore(second.getSegRange())) { + CubeSegment hole = new CubeSegment(); + hole.setCubeInstance(cube); + if (first.isOffsetCube()) { + hole.setSegRange(new SegmentRange(first.getSegRange().end, second.getSegRange().start)); + hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd()); + hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart()); + hole.setName(CubeSegment.makeSegmentName(null, hole.getSegRange(), modelDesc)); + } else { + hole.setTSRange(new TSRange(first.getTSRange().end.v, second.getTSRange().start.v)); + hole.setName(CubeSegment.makeSegmentName(hole.getTSRange(), null, modelDesc)); + } + holes.add(hole); + } + } + return holes; } - } - private boolean isSpecialTestCube(String cubeName) { - return cubeName.equals("kylin_sales_cube") // - || config.isDevEnv() - && (cubeName.startsWith("test_kylin_cube") || cubeName.startsWith("test_streaming")); } - private TableMetadataManager getTableManager() { - return TableMetadataManager.getInstance(config); - } - - private DictionaryManager getDictionaryManager() { - return DictionaryManager.getInstance(config); + // ============================================================================ + // Dictionary/Snapshot related methods + // ============================================================================ + + public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable) + throws IOException { + return dictAssist.buildDictionary(cubeSeg, col, inpTable); } - private SnapshotManager getSnapshotManager() { - return SnapshotManager.getInstance(config); + public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable, + Dictionary<String> dict) throws IOException { + return dictAssist.saveDictionary(cubeSeg, col, inpTable, dict); } - private ResourceStore getStore() { - return ResourceStore.getStore(this.config); + /** + * return null if no dictionary for given column + */ + public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) { + return dictAssist.getDictionary(cubeSeg, col); } - @Override - public RealizationType getRealizationType() { - return RealizationType.CUBE; + public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException { + return dictAssist.buildSnapshotTable(cubeSeg, lookupTable); } - @Override - public IRealization getRealization(String name) { - return getCube(name); + public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) { + return dictAssist.getLookupTable(cubeSegment, join); } - //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns - public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException { - return dictAssist.getUHCIndex(cubeDesc); - } - - // ============================================================================ + private class DictionaryAssist { + public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable) + throws IOException { + CubeDesc cubeDesc = cubeSeg.getCubeDesc(); + if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) + return null; - public List<TblColRef> getAllGlobalDictColumns(CubeDesc cubeDesc) { - List<TblColRef> globalDictCols = new ArrayList<TblColRef>(); - List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries(); + String builderClass = cubeDesc.getDictionaryBuilderClass(col); + DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass); - if (dictionaryDescList == null) { - return globalDictCols; + saveDictionaryInfo(cubeSeg, col, dictInfo); + return dictInfo; } - for (DictionaryDesc dictionaryDesc : dictionaryDescList) { - if (dictionaryDesc.getBuilderClass() != null) { - globalDictCols.add(dictionaryDesc.getColumnRef()); - } + public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable, + Dictionary<String> dict) throws IOException { + CubeDesc cubeDesc = cubeSeg.getCubeDesc(); + if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) + return null; + + DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict); + + saveDictionaryInfo(cubeSeg, col, dictInfo); + return dictInfo; } - return globalDictCols; - } - //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns - public List<TblColRef> getAllUHCColumns(CubeDesc cubeDesc) { - List<TblColRef> uhcColumns = new ArrayList<TblColRef>(); - uhcColumns.addAll(getAllGlobalDictColumns(cubeDesc)); - uhcColumns.addAll(cubeDesc.getShardByColumns()); - return uhcColumns; - } + private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo) + throws IOException { + if (dictInfo == null) + return; - public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException { - List<TblColRef> factDictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt()); - List<TblColRef> uhcColumns = getAllUHCColumns(cubeDesc); - int[] uhcIndex = new int[factDictCols.size()]; + // work on copy instead of cached objects + CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy + CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid()); - for (int i = 0; i < factDictCols.size(); i++) { - if (uhcColumns.contains(factDictCols.get(i))) { - uhcIndex[i] = 1; + Dictionary<?> dict = dictInfo.getDictionaryObject(); + segCopy.putDictResPath(col, dictInfo.getResourcePath()); + segCopy.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() }); + + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToUpdateSegs(segCopy); + updateCube(update); + } + + /** + * return null if no dictionary for given column + */ + @SuppressWarnings("unchecked") + public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) { + DictionaryInfo info = null; + try { + DictionaryManager dictMgr = getDictionaryManager(); + String dictResPath = cubeSeg.getDictResPath(col); + if (dictResPath == null) + return null; + + info = dictMgr.getDictionaryInfo(dictResPath); + if (info == null) + throw new IllegalStateException("No dictionary found by " + dictResPath + + ", invalid cube state; cube segment" + cubeSeg + ", col " + col); + } catch (IOException e) { + throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col, + e); } + return (Dictionary<String>) info.getDictionaryObject(); } - return uhcIndex; - } + public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException { + // work on copy instead of cached objects + CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy + CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid()); - /** - * Calculate the holes (gaps) in segments. - * @param cubeName - * @return - */ - public List<CubeSegment> calculateHoles(String cubeName) { - List<CubeSegment> holes = Lists.newArrayList(); - final CubeInstance cube = getCube(cubeName); - Preconditions.checkNotNull(cube); - final List<CubeSegment> segments = cube.getSegments(); - logger.info("totally " + segments.size() + " cubeSegments"); - if (segments.size() == 0) { - return holes; + TableMetadataManager metaMgr = getTableManager(); + SnapshotManager snapshotMgr = getSnapshotManager(); + + TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject())); + IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); + SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc); + + segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath()); + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToUpdateSegs(segCopy); + updateCube(update); + + return snapshot; } - Collections.sort(segments); - for (int i = 0; i < segments.size() - 1; ++i) { - CubeSegment first = segments.get(i); - CubeSegment second = segments.get(i + 1); - if (first.getSegRange().connects(second.getSegRange())) - continue; - - if (first.getSegRange().apartBefore(second.getSegRange())) { - CubeSegment hole = new CubeSegment(); - hole.setCubeInstance(cube); - if (first.isOffsetCube()) { - hole.setSegRange(new SegmentRange(first.getSegRange().end, second.getSegRange().start)); - hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd()); - hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart()); - hole.setName(CubeSegment.makeSegmentName(null, hole.getSegRange())); - } else { - hole.setTSRange(new TSRange(first.getTSRange().end.v, second.getTSRange().start.v)); - hole.setName(CubeSegment.makeSegmentName(hole.getTSRange(), null)); - } - holes.add(hole); + public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) { + + String tableName = join.getPKSide().getTableIdentity(); + String[] pkCols = join.getPrimaryKey(); + String snapshotResPath = cubeSegment.getSnapshotResPath(tableName); + if (snapshotResPath == null) + throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment" + + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment); + + try { + SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath); + TableDesc tableDesc = getTableManager().getTableDesc(tableName, cubeSegment.getProject()); + return new LookupStringTable(tableDesc, pkCols, snapshot); + } catch (IOException e) { + throw new IllegalStateException( + "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e); } } - - private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder"; - - //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns - public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException { - List<TblColRef> dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt()); - int[] uhcIndex = new int[dictCols.size()]; - - //add GlobalDictionaryColumns - List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries(); - if (dictionaryDescList != null) { - for (DictionaryDesc dictionaryDesc : dictionaryDescList) { - if (dictionaryDesc.getBuilderClass() != null - && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) { - for (int i = 0; i < dictCols.size(); i++) { - if (dictCols.get(i).equals(dictionaryDesc.getColumnRef())) { - uhcIndex[i] = 1; - break; - } - } - } - } - } - - //add ShardByColumns - Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns(); - for (int i = 0; i < dictCols.size(); i++) { - if (shardByColumns.contains(dictCols.get(i))) { - uhcIndex[i] = 1; - } - } - - return uhcIndex; - } - return holes; } }
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java ---------------------------------------------------------------------- diff --cc core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java index 1d4e722,2e1d652..378d082 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java @@@ -34,9 -35,10 +35,10 @@@ public class CubeUpdate private String owner; private int cost = -1; private Map<Long, Long> cuboids = null; + private Set<Long> cuboidsRecommend = null; public CubeUpdate(CubeInstance cubeInstance) { - this.cubeInstance = cubeInstance; + setCubeInstance(cubeInstance); } public CubeInstance getCubeInstance() { http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java ---------------------------------------------------------------------- diff --cc core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java index 1110a5d,d9c7803..7d332bb --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java @@@ -52,16 -56,44 +52,22 @@@ public class Cuboid implements Comparab } }; - // this is the only entry point for query to find the right cuboid for a segment - public static Cuboid identifyCuboid(CubeSegment cubeSegment, Set<TblColRef> dimensions, - Collection<FunctionDesc> metrics) { - return identifyCuboid(cubeSegment.getCuboidScheduler(), dimensions, metrics); - } - - // this is the only entry point for query to find the right cuboid for a cube instance - public static Cuboid identifyCuboid(CubeInstance cubeInstance, Set<TblColRef> dimensions, - Collection<FunctionDesc> metrics) { - return identifyCuboid(cubeInstance.getCuboidScheduler(), dimensions, metrics); ++ // for mandatory cuboid, no need to translate cuboid ++ public static Cuboid findForMandatory(CubeDesc cube, long cuboidID) { ++ return new Cuboid(cube, cuboidID, cuboidID); + } - - public static Cuboid identifyCuboid(CuboidScheduler cuboidScheduler, Set<TblColRef> dimensions, ++ + public static Cuboid findCuboid(CuboidScheduler cuboidScheduler, Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) { - long cuboidID = identifyCuboidId(cuboidScheduler.getCubeDesc(), dimensions, metrics); + long cuboidID = toCuboidId(cuboidScheduler.getCubeDesc(), dimensions, metrics); return Cuboid.findById(cuboidScheduler, cuboidID); } - public static long identifyCuboidId(CubeDesc cubeDesc, Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) { - for (FunctionDesc metric : metrics) { - if (metric.getMeasureType().onlyAggrInBaseCuboid()) - return Cuboid.getBaseCuboidId(cubeDesc); - } - - long cuboidID = 0; - for (TblColRef column : dimensions) { - int index = cubeDesc.getRowkey().getColumnBitIndex(column); - cuboidID |= 1L << index; - } - return cuboidID; - } - - // for mandatory cuboid, no need to translate cuboid - public static Cuboid findForMandatory(CubeDesc cube, long cuboidID) { - return new Cuboid(cube, cuboidID, cuboidID); + public static Cuboid findById(CuboidScheduler cuboidScheduler, byte[] cuboidID) { + return findById(cuboidScheduler, Bytes.toLong(cuboidID)); } + @Deprecated public static Cuboid findById(CubeSegment cubeSegment, long cuboidID) { return findById(cubeSegment.getCuboidScheduler(), cuboidID); } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --cc core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 32578ed,c4e1ced..88a8e43 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@@ -54,6 -54,6 +55,8 @@@ import org.apache.kylin.common.util.Arr import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.cuboid.CuboidScheduler; ++import org.apache.kylin.dict.GlobalDictionaryBuilder; ++import org.apache.kylin.dict.global.SegmentAppendTrieDictBuilder; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType; import org.apache.kylin.metadata.MetadataConstants; @@@ -191,12 -186,15 +196,17 @@@ public class CubeDesc extends RootPersi @JsonInclude(JsonInclude.Include.NON_NULL) private int parentForward = 3; + @JsonProperty("mandatory_dimension_set_list") + @JsonInclude(JsonInclude.Include.NON_NULL) + private List<Set<String>> mandatoryDimensionSetList = Collections.emptyList(); + - private Set<Long> mandatoryCuboids = Sets.newHashSet(); + // Error messages during resolving json metadata + private List<String> errors = new ArrayList<String>(); private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<>(); private LinkedHashSet<ColumnDesc> allColumnDescs = new LinkedHashSet<>(); private LinkedHashSet<TblColRef> dimensionColumns = new LinkedHashSet<>(); ++ private Set<Long> mandatoryCuboids = new HashSet<>(); private Map<TblColRef, DeriveInfo> derivedToHostMap = Maps.newHashMap(); private Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedMap = Maps.newHashMap(); @@@ -1333,6 -1391,6 +1410,31 @@@ } return null; } ++ ++ public List<TblColRef> getAllGlobalDictColumns() { ++ List<TblColRef> globalDictCols = new ArrayList<TblColRef>(); ++ List<DictionaryDesc> dictionaryDescList = getDictionaries(); ++ ++ if (dictionaryDescList == null) { ++ return globalDictCols; ++ } ++ ++ for (DictionaryDesc dictionaryDesc : dictionaryDescList) { ++ String cls = dictionaryDesc.getBuilderClass(); ++ if (GlobalDictionaryBuilder.class.getName().equals(cls) || SegmentAppendTrieDictBuilder.class.getName().equals(cls)) ++ globalDictCols.add(dictionaryDesc.getColumnRef()); ++ } ++ return globalDictCols; ++ } ++ ++ // UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns ++ public List<TblColRef> getAllUHCColumns() { ++ List<TblColRef> uhcColumns = new ArrayList<TblColRef>(); ++ uhcColumns.addAll(getAllGlobalDictColumns()); ++ uhcColumns.addAll(getShardByColumns()); ++ return uhcColumns; ++ } ++ public String getProject() { return getModel().getProject(); http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/test/java/org/apache/kylin/cube/CubeDescTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java ---------------------------------------------------------------------- diff --cc core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java index f293472,03dd928..8953868 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java @@@ -308,6 -306,90 +309,82 @@@ public class CubeManagerTest extends Lo } @Test + public void testAutoMergeWithVolatileRange() throws Exception { + CubeManager mgr = CubeManager.getInstance(getTestConfig()); + CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty"); + - cube.getDescriptor().setAutoMergeTimeRanges(new long[] { 2000, 6000 }); - - mgr.updateCube(new CubeUpdate(cube)); ++ CubeDesc desc = cube.getDescriptor(); ++ desc.setAutoMergeTimeRanges(new long[] { 2000, 6000 }); ++ CubeDescManager.getInstance(getTestConfig()).updateCubeDesc(desc); + ++ cube = mgr.getCube(cube.getName()); + assertTrue(cube.needAutoMerge()); + + // no segment at first + assertEquals(0, cube.getSegments().size()); + + // append first + CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L)); - seg1.setStatus(SegmentStatusEnum.READY); ++ mgr.updateCubeSegStatus(seg1, SegmentStatusEnum.READY); + + CubeSegment seg3 = mgr.appendSegment(cube, new TSRange(2000L, 4000L)); - seg3.setStatus(SegmentStatusEnum.READY); ++ mgr.updateCubeSegStatus(seg3, SegmentStatusEnum.READY); + ++ cube = mgr.getCube(cube.getName()); + assertEquals(2, cube.getSegments().size()); + + SegmentRange mergedSeg = cube.autoMergeCubeSegments(); - + assertTrue(mergedSeg == null); + + assertEquals(2, cube.getSegments().size()); + + // append a new seg - + CubeSegment seg4 = mgr.appendSegment(cube, new TSRange(4000L, 8000L)); - seg4.setStatus(SegmentStatusEnum.READY); ++ mgr.updateCubeSegStatus(seg4, SegmentStatusEnum.READY); + ++ cube = mgr.getCube(cube.getName()); + assertEquals(3, cube.getSegments().size()); + + cube.getDescriptor().setVolatileRange(10000); + + mergedSeg = cube.autoMergeCubeSegments(); - + assertTrue(mergedSeg == null); + + //will merge after change the volatile_range + + cube.getDescriptor().setVolatileRange(0); + + mergedSeg = cube.autoMergeCubeSegments(); - + assertTrue(mergedSeg != null); - + assertTrue((Long) mergedSeg.start.v == 2000 && (Long) mergedSeg.end.v == 8000); + + // fill the gap - + CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L)); - seg2.setStatus(SegmentStatusEnum.READY); ++ mgr.updateCubeSegStatus(seg2, SegmentStatusEnum.READY); + ++ cube = mgr.getCube(cube.getName()); + assertEquals(4, cube.getSegments().size()); + + cube.getDescriptor().setVolatileRange(10000); + + mergedSeg = cube.autoMergeCubeSegments(); - + assertTrue(mergedSeg == null); + + //will merge after change the volatile_range + cube.getDescriptor().setVolatileRange(0); + + mergedSeg = cube.autoMergeCubeSegments(); - + assertTrue(mergedSeg != null); - + assertTrue((Long) mergedSeg.start.v == 0 && (Long) mergedSeg.end.v == 8000); + + cube.getDescriptor().setVolatileRange(1000); + + mergedSeg = cube.autoMergeCubeSegments(); - + assertTrue(mergedSeg != null); - + assertTrue((Long) mergedSeg.start.v == 0 && (Long) mergedSeg.end.v == 2000); - + } + + @Test public void testGetCubeNameWithNamespace() { System.setProperty("kylin.storage.hbase.table-name-prefix", "HELLO_"); try { @@@ -317,8 -399,43 +394,40 @@@ } finally { System.clearProperty("kylin.storage.hbase.table-name-prefix"); } + + System.setProperty("kylin.storage.hbase.namespace", "MYSPACE"); + try { + CubeManager mgr = CubeManager.getInstance(getTestConfig()); + String tablename = mgr.generateStorageLocation(); + assertTrue(tablename.startsWith("MYSPACE:")); + } finally { + System.clearProperty("kylin.storage.hbase.namespace"); + } } + @Test + public void testBuildCubeWithPartitionStartDate() throws IOException { + Long PARTITION_DATE_START = 1513123200L; + Long FIRST_BUILD_DATE_END = 1514764800L; + Long SECOND_BUILD_DATE_END = 1540339200L; + + KylinConfig config = getTestConfig(); + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance cube = cubeManager.getCube("test_kylin_cube_with_slr_empty"); + cube.getDescriptor().setPartitionDateStart(PARTITION_DATE_START); + + CubeSegment segment = cubeManager.appendSegment(cube, new TSRange(0L, FIRST_BUILD_DATE_END), null, null, null); + assertEquals(segment._getDateRangeStart(), PARTITION_DATE_START.longValue()); + assertEquals(segment._getDateRangeEnd(), FIRST_BUILD_DATE_END.longValue()); + - segment.setStatus(SegmentStatusEnum.READY); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeManager.updateCube(cubeBuilder); ++ cubeManager.updateCubeSegStatus(segment, SegmentStatusEnum.READY); + + segment = cubeManager.appendSegment(cube, new TSRange(0L, SECOND_BUILD_DATE_END), null, null, null); + assertEquals(segment._getDateRangeStart(), FIRST_BUILD_DATE_END.longValue()); + assertEquals(segment._getDateRangeEnd(), SECOND_BUILD_DATE_END.longValue()); - + } + + public CubeDescManager getCubeDescManager() { return CubeDescManager.getInstance(getTestConfig()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java ---------------------------------------------------------------------- diff --cc core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java index 404d53c,5717542..fd7affd --- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java @@@ -42,8 -42,7 +42,8 @@@ public class GlobalDictionaryBuilder im private static Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class); + @Override - public void init(DictionaryInfo dictInfo, int baseId) throws IOException { + public void init(DictionaryInfo dictInfo, int baseId, String hdfsDir) throws IOException { sourceColumn = dictInfo.getSourceTable() + "_" + dictInfo.getSourceColumn(); lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread(); lock.lock(getLockPath(sourceColumn), Long.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java ---------------------------------------------------------------------- diff --cc core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java index 0934a7d,18bbb07..e2a643d --- a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java @@@ -28,7 -28,7 +28,7 @@@ import org.apache.kylin.common.util.Dic public interface IDictionaryBuilder { /** Sets the dictionary info for the dictionary being built. Mainly for GlobalDictionaryBuilder. */ - void init(DictionaryInfo info, int baseId) throws IOException; - void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException; ++ void init(DictionaryInfo info, int baseId, String hdfsWorkingDir) throws IOException; /** Add a new value into dictionary, returns it is accepted (not null) or not. */ boolean addValue(String value); http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-dictionary/src/main/java/org/apache/kylin/dict/Number2BytesConverter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java ---------------------------------------------------------------------- diff --cc core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java index c8bc13d,e5f2d57..cfc1f98 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java @@@ -43,9 -43,9 +43,14 @@@ public class SegmentAppendTrieDictBuild KylinConfig config = KylinConfig.getInstanceFromEnv(); int maxEntriesPerSlice = config.getAppendDictEntrySize(); ++ if (hdfsDir == null) { ++ //build in Kylin job server ++ hdfsDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(); ++ } ++ //use UUID to make each segment dict in different HDFS dir and support concurrent build //use timestamp to make the segment dict easily to delete - String baseDir = config.getHdfsWorkingDirectory() + "resources/SegmentDict" + dictInfo.getResourceDir() + "/" + UUID.randomUUID().toString() + "_" + System.currentTimeMillis()+ "/"; + String baseDir = hdfsDir + "resources/SegmentDict" + dictInfo.getResourceDir() + "/" + UUID.randomUUID().toString() + "_" + System.currentTimeMillis()+ "/"; this.builder = new AppendTrieDictionaryBuilder(baseDir, maxEntriesPerSlice, false); this.baseId = baseId; http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --cc core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 27678ac,316fc99..7b8e413 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@@ -65,7 -67,7 +67,7 @@@ public class JoinedFlatTable } public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir, - String format, String fieldDelimiter) { - String storageFormat, String filedDelimiter) { ++ String storageFormat, String fieldDelimiter) { StringBuilder ddl = new StringBuilder(); ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getTableName() + "\n"); @@@ -79,11 -81,12 +81,12 @@@ ddl.append(colName(col) + " " + getHiveDataType(col.getDatatype()) + "\n"); } ddl.append(")" + "\n"); - if ("TEXTFILE".equals(format)) { + if ("TEXTFILE".equals(storageFormat)) { - ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '" + filedDelimiter + "'\n"); + ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '" + fieldDelimiter + "'\n"); } - ddl.append("STORED AS " + format + "\n"); + ddl.append("STORED AS " + storageFormat + "\n"); ddl.append("LOCATION '" + getTableDir(flatDesc, storageDfsDir) + "';").append("\n"); + ddl.append("ALTER TABLE " + flatDesc.getTableName() + " SET TBLPROPERTIES('auto.purge'='true');\n"); return ddl.toString(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java ---------------------------------------------------------------------- diff --cc core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 8795e4c,9e53459..404db54 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@@ -89,13 -90,13 +91,13 @@@ public class DefaultChainedExecutable e @Override protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) { ExecutableManager mgr = getManager(); - + if (isDiscarded()) { setEndTime(System.currentTimeMillis()); - notifyUserStatusChange(executableContext, ExecutableState.DISCARDED); + onStatusChange(executableContext, result, ExecutableState.DISCARDED); } else if (isPaused()) { setEndTime(System.currentTimeMillis()); - notifyUserStatusChange(executableContext, ExecutableState.STOPPED); + onStatusChange(executableContext, result, ExecutableState.STOPPED); } else if (result.succeed()) { List<? extends Executable> jobs = getTasks(); boolean allSucceed = true; @@@ -131,7 -125,9 +133,7 @@@ } else if (hasError) { setEndTime(System.currentTimeMillis()); mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, null); - notifyUserStatusChange(executableContext, ExecutableState.ERROR); + onStatusChange(executableContext, result, ExecutableState.ERROR); - } else if (hasRunning) { - mgr.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); } else if (hasDiscarded) { setEndTime(System.currentTimeMillis()); mgr.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null); @@@ -145,9 -141,8 +147,8 @@@ } } - @Override - public List<AbstractExecutable> getTasks() { - return subTasks; + protected void onStatusChange(ExecutableContext context, ExecuteResult result, ExecutableState state) { - notifyUserStatusChange(context, state); ++ super.notifyUserStatusChange(context, state); } @Override @@@ -170,31 -165,8 +171,36 @@@ this.subTasks.add(executable); } + private boolean retryFetchTaskStatus(Executable task) { + boolean hasRunning = false; + int retry = 1; + while (retry <= 10) { + ExecutableState retryState = task.getStatus(); + if (retryState == ExecutableState.RUNNING) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + logger.error("Failed to Sleep: ", e); + } + hasRunning = true; + logger.error("With {} times retry, it's state is still RUNNING", retry); + } else { + logger.info("With {} times retry, status is changed to: {}", retry, retryState); + hasRunning = false; + break; + } + retry++; + } + if (hasRunning) { + logger.error("Parent task: {} is finished, but it's subtask: {}'s state is still RUNNING \n" + + ", mark parent task failed.", getName(), task.getName()); + return false; + } + return true; + } ++ + @Override + public int getDefaultPriority() { + return DEFAULT_PRIORITY; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --cc core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 69d85ca,287f215..6af0192 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@@ -49,9 -52,9 +52,39 @@@ import com.google.common.collect.Maps */ public class DefaultScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener { ++ private static DefaultScheduler INSTANCE = null; ++ ++ public static DefaultScheduler getInstance() { ++ if (INSTANCE == null) { ++ INSTANCE = createInstance(); ++ } ++ return INSTANCE; ++ } ++ ++ public synchronized static DefaultScheduler createInstance() { ++ destroyInstance(); ++ INSTANCE = new DefaultScheduler(); ++ return INSTANCE; ++ } ++ ++ public synchronized static void destroyInstance() { ++ DefaultScheduler tmp = INSTANCE; ++ INSTANCE = null; ++ if (tmp != null) { ++ try { ++ tmp.shutdown(); ++ } catch (SchedulerException e) { ++ logger.error("error stop DefaultScheduler", e); ++ throw new RuntimeException(e); ++ } ++ } ++ } ++ ++ // ============================================================================ ++ private JobLock jobLock; private ExecutableManager executableManager; - private FetcherRunner fetcher; + private Runnable fetcher; private ScheduledExecutorService fetcherPool; private ExecutorService jobPool; private DefaultContext context; @@@ -59,11 -62,10 +92,9 @@@ private static final Logger logger = LoggerFactory.getLogger(DefaultScheduler.class); private volatile boolean initialized = false; private volatile boolean hasStarted = false; + volatile boolean fetchFailed = false; private JobEngineConfig jobEngineConfig; -- private static DefaultScheduler INSTANCE = null; -- public DefaultScheduler() { if (INSTANCE != null) { throw new IllegalStateException("DefaultScheduler has been initiated."); @@@ -195,25 -275,25 +322,6 @@@ } } -- public synchronized static DefaultScheduler createInstance() { -- destroyInstance(); -- INSTANCE = new DefaultScheduler(); -- return INSTANCE; -- } -- -- public synchronized static void destroyInstance() { -- DefaultScheduler tmp = INSTANCE; -- INSTANCE = null; -- if (tmp != null) { -- try { -- tmp.shutdown(); -- } catch (SchedulerException e) { -- logger.error("error stop DefaultScheduler", e); -- throw new RuntimeException(e); -- } -- } -- } -- @Override public synchronized void init(JobEngineConfig jobEngineConfig, JobLock lock) throws SchedulerException { jobLock = lock;