APACHE-KYLIN-2783: Refactor CuboidScheduler to be extensible
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/721c64dd Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/721c64dd Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/721c64dd Branch: refs/heads/yaho-cube-planner Commit: 721c64ddd2977ae82975d77e16e50e7f3c4659ec Parents: b8f0843 Author: Zhong <nju_y...@apache.org> Authored: Thu Aug 17 20:17:26 2017 +0800 Committer: Zhong <nju_y...@apache.org> Committed: Thu Aug 17 20:17:26 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/CubeDescManager.java | 3 - .../org/apache/kylin/cube/CubeInstance.java | 16 ++ .../java/org/apache/kylin/cube/CubeManager.java | 9 +- .../java/org/apache/kylin/cube/CubeSegment.java | 5 + .../kylin/cube/common/RowKeySplitter.java | 4 +- .../org/apache/kylin/cube/cuboid/Cuboid.java | 179 ++++++------------- .../org/apache/kylin/cube/cuboid/CuboidCLI.java | 21 +-- .../kylin/cube/cuboid/CuboidScheduler.java | 18 ++ .../cube/cuboid/DefaultCuboidScheduler.java | 156 +++++++++++++++- .../inmemcubing/AbstractInMemCubeBuilder.java | 17 +- .../cube/inmemcubing/DoggedCubeBuilder.java | 9 +- .../cube/inmemcubing/InMemCubeBuilder.java | 11 +- .../org/apache/kylin/cube/kv/RowKeyDecoder.java | 4 +- .../org/apache/kylin/cube/kv/RowKeyEncoder.java | 11 +- .../org/apache/kylin/cube/model/CubeDesc.java | 35 ++-- .../cube/model/CubeJoinedFlatTableEnrich.java | 7 +- .../org/apache/kylin/cube/util/CubingUtils.java | 2 +- .../kylin/cube/cuboid/CuboidSchedulerTest.java | 26 +-- .../apache/kylin/cube/cuboid/CuboidTest.java | 60 ++++--- .../apache/kylin/cube/kv/RowKeyDecoderTest.java | 2 +- .../apache/kylin/cube/kv/RowKeyEncoderTest.java | 6 +- .../gtrecord/GTCubeStorageQueryBase.java | 6 +- .../kylin/storage/translate/HBaseKeyRange.java | 2 +- .../kylin/engine/mr/BatchCubingJobBuilder.java | 2 +- .../kylin/engine/mr/BatchCubingJobBuilder2.java | 2 +- .../engine/mr/common/BaseCuboidBuilder.java | 14 +- .../kylin/engine/mr/common/CubeStatsReader.java | 7 +- .../mr/steps/FactDistinctColumnsMapper.java | 2 +- .../engine/mr/steps/InMemCuboidMapper.java | 3 +- .../kylin/engine/mr/steps/KVGTRecordWriter.java | 2 +- .../engine/mr/steps/MergeCuboidMapper.java | 2 +- .../kylin/engine/mr/steps/NDCuboidMapper.java | 6 +- .../apache/kylin/engine/spark/SparkCubing.java | 9 +- .../kylin/engine/spark/SparkCubingByLayer.java | 10 +- .../spark/cube/DefaultTupleConverter.java | 2 +- .../ITDoggedCubeBuilderStressTest.java | 2 +- .../inmemcubing/ITDoggedCubeBuilderTest.java | 4 +- .../inmemcubing/ITInMemCubeBuilderTest.java | 2 +- .../storage/hbase/steps/HBaseCuboidWriter.java | 4 +- .../cube/MeasureTypeOnlyAggrInBaseTest.java | 4 +- 40 files changed, 406 insertions(+), 280 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java index 6635366..f49c9be 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java @@ -171,7 +171,6 @@ public class CubeDescManager { CubeDesc ndesc = loadCubeDesc(CubeDesc.concatResourcePath(name), false); cubeDescMap.putLocal(ndesc.getName(), ndesc); - Cuboid.reloadCache(name); // if related cube is in DESCBROKEN state before, change it back to DISABLED CubeManager cubeManager = CubeManager.getInstance(config); @@ -297,13 +296,11 @@ public class CubeDescManager { String path = cubeDesc.getResourcePath(); getStore().deleteResource(path); cubeDescMap.remove(cubeDesc.getName()); - Cuboid.reloadCache(cubeDesc.getName()); } // remove cubeDesc public void removeLocalCubeDesc(String name) throws IOException { cubeDescMap.removeLocal(name); - Cuboid.reloadCache(name); } private void reloadAllCubeDesc() throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index fb9a7a7..6369b38 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -27,6 +27,7 @@ import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.DataModelDesc; @@ -93,10 +94,25 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, @JsonProperty("create_time_utc") private long createTimeUTC; + // cuboid scheduler lazy built + transient private CuboidScheduler cuboidScheduler; + // default constructor for jackson public CubeInstance() { } + public CuboidScheduler getCuboidScheduler() { + if (cuboidScheduler != null) + return cuboidScheduler; + + synchronized (this) { + if (cuboidScheduler == null) { + cuboidScheduler = getDescriptor().getInitialCuboidScheduler(); + } + } + return cuboidScheduler; + } + public List<CubeSegment> getBuildingSegments() { return segments.getBuildingSegments(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 4215746..739400a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -43,6 +43,7 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.DictionaryDesc; import org.apache.kylin.dict.DictionaryInfo; @@ -311,6 +312,7 @@ public class CubeManager implements IRealizationProvider { // remove cube and update cache getStore().deleteResource(cube.getResourcePath()); cubeMap.remove(cube.getName()); + Cuboid.reloadCache(cube); // delete cube from project ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName); @@ -658,17 +660,20 @@ public class CubeManager implements IRealizationProvider { * @param cubeName */ public CubeInstance reloadCubeLocal(String cubeName) { - return reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName)); + CubeInstance cubeInstance = reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName)); + Cuboid.reloadCache(cubeInstance); + return cubeInstance; } public void removeCubeLocal(String cubeName) { CubeInstance cube = cubeMap.get(cubeName); if (cube != null) { + cubeMap.removeLocal(cubeName); for (CubeSegment segment : cube.getSegments()) { usedStorageLocation.remove(segment.getUuid()); } + Cuboid.reloadCache(cube); } - cubeMap.removeLocal(cubeName); } public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) { http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 1b28bd8..5b2a644 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -31,6 +31,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ShardingHash; +import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.kv.CubeDimEncMap; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; @@ -119,6 +120,10 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen return getCubeInstance().getDescriptor(); } + public CuboidScheduler getCuboidScheduler() { + return getCubeInstance().getCuboidScheduler(); + } + /** * @param startDate * @param endDate http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java index acebce4..cd26347 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java @@ -32,6 +32,7 @@ import org.apache.kylin.metadata.model.TblColRef; public class RowKeySplitter implements java.io.Serializable { + private CubeSegment cubeSegment; private CubeDesc cubeDesc; private RowKeyColumnIO colIO; @@ -63,6 +64,7 @@ public class RowKeySplitter implements java.io.Serializable { } public RowKeySplitter(CubeSegment cubeSeg, int splitLen, int bytesLen) { + this.cubeSegment = cubeSeg; this.enableSharding = cubeSeg.isEnableSharding(); this.cubeDesc = cubeSeg.getCubeDesc(); IDimensionEncodingMap dimEncoding = new CubeDimEncMap(cubeSeg); @@ -113,7 +115,7 @@ public class RowKeySplitter implements java.io.Serializable { offset += RowConstants.ROWKEY_CUBOIDID_LEN; long lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length); - Cuboid cuboid = Cuboid.findById(cubeDesc, lastSplittedCuboidId); + Cuboid cuboid = Cuboid.findById(cubeSegment, lastSplittedCuboidId); // rowkey columns for (int i = 0; i < cuboid.getColumns().size(); i++) { http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java index b71608c..b2cf339 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java @@ -21,15 +21,15 @@ package org.apache.kylin.cube.cuboid; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; import org.apache.kylin.cube.model.AggregationGroup; import org.apache.kylin.cube.model.AggregationGroup.HierarchyMask; @@ -38,16 +38,15 @@ import org.apache.kylin.cube.model.RowKeyColDesc; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Collections2; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ComparisonChain; -import com.google.common.collect.Lists; +import com.google.common.collect.Maps; @SuppressWarnings("serial") public class Cuboid implements Comparable<Cuboid>, Serializable { - private final static Map<String, Map<Long, Cuboid>> CUBOID_CACHE = new ConcurrentHashMap<String, Map<Long, Cuboid>>(); + //TODO Guava cache may be better + private final static Map<String, Map<Long, Cuboid>> CUBOID_CACHE = Maps.newConcurrentMap(); // smaller is better public final static Comparator<Long> cuboidSelectComparator = new Comparator<Long>() { @@ -57,10 +56,22 @@ public class Cuboid implements Comparable<Cuboid>, Serializable { } }; - // this is the only entry point for query to find the right cuboid - public static Cuboid identifyCuboid(CubeDesc cubeDesc, Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) { - long cuboidID = identifyCuboidId(cubeDesc, dimensions, metrics); - return Cuboid.findById(cubeDesc, cuboidID); + // this is the only entry point for query to find the right cuboid for a segment + public static Cuboid identifyCuboid(CubeSegment cubeSegment, Set<TblColRef> dimensions, + Collection<FunctionDesc> metrics) { + return identifyCuboid(cubeSegment.getCuboidScheduler(), dimensions, metrics); + } + + // this is the only entry point for query to find the right cuboid for a cube instance + public static Cuboid identifyCuboid(CubeInstance cubeInstance, Set<TblColRef> dimensions, + Collection<FunctionDesc> metrics) { + return identifyCuboid(cubeInstance.getCuboidScheduler(), dimensions, metrics); + } + + public static Cuboid identifyCuboid(CuboidScheduler cuboidScheduler, Set<TblColRef> dimensions, + Collection<FunctionDesc> metrics) { + long cuboidID = identifyCuboidId(cuboidScheduler.getCubeDesc(), dimensions, metrics); + return Cuboid.findById(cuboidScheduler, cuboidID); } public static long identifyCuboidId(CubeDesc cubeDesc, Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) { @@ -77,32 +88,45 @@ public class Cuboid implements Comparable<Cuboid>, Serializable { return cuboidID; } - // for full cube, no need to translate cuboid - public static Cuboid findForFullCube(CubeDesc cube, long cuboidID) { + // for mandatory cuboid, no need to translate cuboid + public static Cuboid findForMandatory(CubeDesc cube, long cuboidID) { return new Cuboid(cube, cuboidID, cuboidID); } - public static Cuboid findById(CubeDesc cube, byte[] cuboidID) { - return findById(cube, Bytes.toLong(cuboidID)); + public static Cuboid findById(CuboidScheduler cuboidScheduler, byte[] cuboidID) { + return findById(cuboidScheduler, Bytes.toLong(cuboidID)); + } + + public static Cuboid findById(CubeSegment cubeSegment, long cuboidID) { + return findById(cubeSegment.getCuboidScheduler(), cuboidID); } - public static Cuboid findById(CubeDesc cube, long cuboidID) { - Map<Long, Cuboid> cubeCache = CUBOID_CACHE.get(cube.getName()); + public static Cuboid findById(CubeInstance cubeInstance, long cuboidID) { + return findById(cubeInstance.getCuboidScheduler(), cuboidID); + } + + @VisibleForTesting + static Cuboid findById(CubeDesc cubeDesc, long cuboidID) { + return findById(cubeDesc.getInitialCuboidScheduler(), cuboidID); + } + + public static Cuboid findById(CuboidScheduler cuboidScheduler, long cuboidID) { + Map<Long, Cuboid> cubeCache = CUBOID_CACHE.get(cuboidScheduler.getResponsibleKey()); if (cubeCache == null) { - cubeCache = new ConcurrentHashMap<Long, Cuboid>(); - CUBOID_CACHE.put(cube.getName(), cubeCache); + cubeCache = Maps.newConcurrentMap(); + CUBOID_CACHE.put(cuboidScheduler.getResponsibleKey(), cubeCache); } Cuboid cuboid = cubeCache.get(cuboidID); if (cuboid == null) { - long validCuboidID = translateToValidCuboid(cube, cuboidID); - cuboid = new Cuboid(cube, cuboidID, validCuboidID); + long validCuboidID = translateToValidCuboid(cuboidScheduler, cuboidID); + cuboid = new Cuboid(cuboidScheduler.getCubeDesc(), cuboidID, validCuboidID); cubeCache.put(cuboidID, cuboid); } return cuboid; } - public static boolean isValid(CubeDesc cube, long cuboidID) { - return cube.getAllCuboids().contains(cuboidID); + public static boolean isValid(CuboidScheduler cuboidScheduler, long cuboidID) { + return cuboidScheduler.getAllCuboidIds().contains(cuboidID); } public static long getBaseCuboidId(CubeDesc cube) { @@ -110,107 +134,14 @@ public class Cuboid implements Comparable<Cuboid>, Serializable { } public static Cuboid getBaseCuboid(CubeDesc cube) { - return findById(cube, getBaseCuboidId(cube)); + return findForMandatory(cube, getBaseCuboidId(cube)); } - static long translateToValidCuboid(CubeDesc cubeDesc, long cuboidID) { - long baseCuboidId = getBaseCuboidId(cubeDesc); - if (cubeDesc.getAllCuboids().contains(cuboidID)) { + static long translateToValidCuboid(CuboidScheduler cuboidScheduler, long cuboidID) { + if (isValid(cuboidScheduler, cuboidID)) { return cuboidID; } - List<Long> onTreeCandidates = Lists.newArrayList(); - for (AggregationGroup agg : cubeDesc.getAggregationGroups()) { - Long candidate = translateToOnTreeCuboid(agg, cuboidID); - if (candidate != null) { - onTreeCandidates.add(candidate); - } - } - - if (onTreeCandidates.size() == 0) { - return baseCuboidId; - } - - long onTreeCandi = Collections.min(onTreeCandidates, cuboidSelectComparator); - if (isValid(cubeDesc, onTreeCandi)) { - return onTreeCandi; - } - - return cubeDesc.getCuboidScheduler().findBestMatchCuboid(onTreeCandi); - } - - private static Long translateToOnTreeCuboid(AggregationGroup agg, long cuboidID) { - if ((cuboidID & ~agg.getPartialCubeFullMask()) > 0) { - //the partial cube might not contain all required dims - return null; - } - - // add mandantory - cuboidID = cuboidID | agg.getMandatoryColumnMask(); - - // add hierarchy - for (HierarchyMask hierarchyMask : agg.getHierarchyMasks()) { - long fullMask = hierarchyMask.fullMask; - long intersect = cuboidID & fullMask; - if (intersect != 0 && intersect != fullMask) { - - boolean startToFill = false; - for (int i = hierarchyMask.dims.length - 1; i >= 0; i--) { - if (startToFill) { - cuboidID |= hierarchyMask.dims[i]; - } else { - if ((cuboidID & hierarchyMask.dims[i]) != 0) { - startToFill = true; - cuboidID |= hierarchyMask.dims[i]; - } - } - } - } - } - - // add joint dims - for (Long joint : agg.getJoints()) { - if (((cuboidID | joint) != cuboidID) && ((cuboidID & ~joint) != cuboidID)) { - cuboidID = cuboidID | joint; - } - } - - if (!agg.isOnTree(cuboidID)) { - // no column, add one column - long nonJointDims = removeBits((agg.getPartialCubeFullMask() ^ agg.getMandatoryColumnMask()), agg.getJoints()); - if (nonJointDims != 0) { - long nonJointNonHierarchy = removeBits(nonJointDims, Collections2.transform(agg.getHierarchyMasks(), new Function<HierarchyMask, Long>() { - @Override - public Long apply(HierarchyMask input) { - return input.fullMask; - } - })); - if (nonJointNonHierarchy != 0) { - //there exists dim that does not belong to any joint or any hierarchy, that's perfect - return cuboidID | Long.lowestOneBit(nonJointNonHierarchy); - } else { - //choose from a hierarchy that does not intersect with any joint dim, only check level 1 - long allJointDims = agg.getJointDimsMask(); - for (HierarchyMask hierarchyMask : agg.getHierarchyMasks()) { - long dim = hierarchyMask.allMasks[0]; - if ((dim & allJointDims) == 0) { - return cuboidID | dim; - } - } - } - } - - cuboidID = cuboidID | Collections.min(agg.getJoints(), cuboidSelectComparator); - Preconditions.checkState(agg.isOnTree(cuboidID)); - } - return cuboidID; - } - - private static long removeBits(long original, Collection<Long> toRemove) { - long ret = original; - for (Long joint : toRemove) { - ret = ret & ~joint; - } - return ret; + return cuboidScheduler.findBestMatchCuboid(cuboidID); } // ============================================================================ @@ -310,8 +241,12 @@ public class Cuboid implements Comparable<Cuboid>, Serializable { CUBOID_CACHE.clear(); } - public static void reloadCache(String cubeDescName) { - CUBOID_CACHE.remove(cubeDescName); + public static void reloadCache(CubeInstance cubeInstance) { + reloadCache(cubeInstance.getCuboidScheduler()); + } + + private static void reloadCache(CuboidScheduler cuboidScheduler) { + CUBOID_CACHE.remove(cuboidScheduler.getResponsibleKey()); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java index d657a43..d27ca7a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java @@ -44,7 +44,7 @@ public class CuboidCLI { } public static int simulateCuboidGeneration(CubeDesc cubeDesc, boolean validate) { - CuboidScheduler scheduler = cubeDesc.getCuboidScheduler(); + CuboidScheduler scheduler = cubeDesc.getInitialCuboidScheduler(); long baseCuboid = Cuboid.getBaseCuboidId(cubeDesc); Collection<Long> cuboidSet = new TreeSet<Long>(); cuboidSet.add(baseCuboid); @@ -88,28 +88,29 @@ public class CuboidCLI { //check all valid and invalid for (long i = 0; i < baseCuboid; ++i) { if (cuboidSet.contains(i)) { - if (!Cuboid.isValid(cubeDesc, i)) { + if (!Cuboid.isValid(cubeDesc.getInitialCuboidScheduler(), i)) { throw new RuntimeException(); } - if (Cuboid.translateToValidCuboid(cubeDesc, i) != i) { + if (Cuboid.translateToValidCuboid(cubeDesc.getInitialCuboidScheduler(), i) != i) { throw new RuntimeException(); } } else { - if (Cuboid.isValid(cubeDesc, i)) { + if (Cuboid.isValid(cubeDesc.getInitialCuboidScheduler(), i)) { throw new RuntimeException(); } - long corrected = Cuboid.translateToValidCuboid(cubeDesc, i); + long corrected = Cuboid.translateToValidCuboid(cubeDesc.getInitialCuboidScheduler(), i); if (corrected == i) { throw new RuntimeException(); } - if (!Cuboid.isValid(cubeDesc, corrected)) { + if (!Cuboid.isValid(cubeDesc.getInitialCuboidScheduler(), corrected)) { throw new RuntimeException(); } - if (Cuboid.translateToValidCuboid(cubeDesc, corrected) != corrected) { + if (Cuboid.translateToValidCuboid(cubeDesc.getInitialCuboidScheduler(), + corrected) != corrected) { throw new RuntimeException(); } } @@ -125,7 +126,7 @@ public class CuboidCLI { long baseCuboid = Cuboid.getBaseCuboidId(cube); TreeSet<Long> expectedCuboids = new TreeSet<Long>(); for (long cuboid = 0; cuboid <= baseCuboid; cuboid++) { - if (Cuboid.isValid(cube, cuboid)) { + if (Cuboid.isValid(cube.getInitialCuboidScheduler(), cuboid)) { expectedCuboids.add(cuboid); } } @@ -133,10 +134,10 @@ public class CuboidCLI { } public static int[] calculateAllLevelCount(CubeDesc cube) { - int levels = cube.getBuildLevel(); + int levels = cube.getInitialBuildLevel(); int[] allLevelCounts = new int[levels + 1]; - CuboidScheduler scheduler = cube.getCuboidScheduler(); + CuboidScheduler scheduler = cube.getInitialCuboidScheduler(); LinkedList<Long> nextQueue = new LinkedList<Long>(); LinkedList<Long> currentQueue = new LinkedList<Long>(); long baseCuboid = Cuboid.getBaseCuboidId(cube); http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java index 1d8f589..c08b9db 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java @@ -63,10 +63,21 @@ abstract public class CuboidScheduler { /** Returns a cuboid on the tree that best matches the request cuboid. */ abstract public long findBestMatchCuboid(long requestCuboid); + /** Returns the key for what this cuboid scheduler responsible for*/ + abstract public String getResponsibleKey(); + // ============================================================================ private transient List<List<Long>> cuboidsByLayer; + public long getBaseCuboidId() { + return Cuboid.getBaseCuboidId(cubeDesc); + } + + public CubeDesc getCubeDesc() { + return cubeDesc; + } + /** * Get cuboids by layer. It's built from pre-expanding tree. * @return layered cuboids @@ -101,4 +112,11 @@ abstract public class CuboidScheduler { return cuboidsByLayer; } + /** + * Get cuboid level count except base cuboid + * @return + */ + public int getBuildLevel() { + return getCuboidsByLayer().size() - 1; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java index b75acd5..c756079 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java @@ -35,9 +35,13 @@ import javax.annotation.Nullable; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.model.AggregationGroup; +import org.apache.kylin.cube.model.AggregationGroup.HierarchyMask; import org.apache.kylin.cube.model.CubeDesc; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -159,26 +163,160 @@ public class DefaultCuboidScheduler extends CuboidScheduler { } /** - * Get the parent cuboid really on the spanning tree. - * @param child an on-tree cuboid + * Get the parent cuboid rely on the spanning tree. + * @param cuboid an on-tree cuboid * @return */ - @Override - public long findBestMatchCuboid(long child) { - long parent = getOnTreeParent(child); + public long findBestMatchCuboid(long cuboid) { + return findBestMatchCuboid1(cuboid); + } + + public long findBestMatchCuboid2(long cuboid) { + long bestParent = doFindBestMatchCuboid2(cuboid, Cuboid.getBaseCuboidId(cubeDesc)); + if (bestParent < -1) { + throw new IllegalStateException("Cannot find the parent of the cuboid:" + cuboid); + } + return bestParent; + } + + private long doFindBestMatchCuboid2(long cuboid, long parent) { + if (!canDerive(cuboid, parent)) { + return -1; + } + List<Long> children = parent2child.get(parent); + List<Long> candidates = Lists.newArrayList(); + if (children != null) { + for (long child : children) { + long candidate = doFindBestMatchCuboid2(cuboid, child); + if (candidate > 0) { + candidates.add(candidate); + } + } + } + if (candidates.isEmpty()) { + candidates.add(parent); + } + + return Collections.min(candidates, Cuboid.cuboidSelectComparator); + } + + private boolean canDerive(long cuboidId, long parentCuboid) { + return (cuboidId & ~parentCuboid) == 0; + } + + public long findBestMatchCuboid1(long cuboid) { + List<Long> onTreeCandidates = Lists.newArrayList(); + for (AggregationGroup agg : cubeDesc.getAggregationGroups()) { + Long candidate = translateToOnTreeCuboid(agg, cuboid); + if (candidate != null) { + onTreeCandidates.add(candidate); + } + } + + if (onTreeCandidates.size() == 0) { + return Cuboid.getBaseCuboidId(cubeDesc); + } + + long onTreeCandi = Collections.min(onTreeCandidates, Cuboid.cuboidSelectComparator); + if (allCuboidIds.contains(onTreeCandi)) { + return onTreeCandi; + } + + return doFindBestMatchCuboid1(onTreeCandi); + } + + public long doFindBestMatchCuboid1(long cuboid) { + long parent = getOnTreeParent(cuboid); while (parent > 0) { - if (cubeDesc.getAllCuboids().contains(parent)) { + if (allCuboidIds.contains(parent)) { break; } parent = getOnTreeParent(parent); } if (parent <= 0) { - throw new IllegalStateException("Can't find valid parent for Cuboid " + child); + throw new IllegalStateException("Can't find valid parent for Cuboid " + cuboid); } return parent; } + private static Long translateToOnTreeCuboid(AggregationGroup agg, long cuboidID) { + if ((cuboidID & ~agg.getPartialCubeFullMask()) > 0) { + //the partial cube might not contain all required dims + return null; + } + + // add mandantory + cuboidID = cuboidID | agg.getMandatoryColumnMask(); + + // add hierarchy + for (HierarchyMask hierarchyMask : agg.getHierarchyMasks()) { + long fullMask = hierarchyMask.fullMask; + long intersect = cuboidID & fullMask; + if (intersect != 0 && intersect != fullMask) { + + boolean startToFill = false; + for (int i = hierarchyMask.dims.length - 1; i >= 0; i--) { + if (startToFill) { + cuboidID |= hierarchyMask.dims[i]; + } else { + if ((cuboidID & hierarchyMask.dims[i]) != 0) { + startToFill = true; + cuboidID |= hierarchyMask.dims[i]; + } + } + } + } + } + + // add joint dims + for (Long joint : agg.getJoints()) { + if (((cuboidID | joint) != cuboidID) && ((cuboidID & ~joint) != cuboidID)) { + cuboidID = cuboidID | joint; + } + } + + if (!agg.isOnTree(cuboidID)) { + // no column, add one column + long nonJointDims = removeBits((agg.getPartialCubeFullMask() ^ agg.getMandatoryColumnMask()), + agg.getJoints()); + if (nonJointDims != 0) { + long nonJointNonHierarchy = removeBits(nonJointDims, + Collections2.transform(agg.getHierarchyMasks(), new Function<HierarchyMask, Long>() { + @Override + public Long apply(HierarchyMask input) { + return input.fullMask; + } + })); + if (nonJointNonHierarchy != 0) { + //there exists dim that does not belong to any joint or any hierarchy, that's perfect + return cuboidID | Long.lowestOneBit(nonJointNonHierarchy); + } else { + //choose from a hierarchy that does not intersect with any joint dim, only check level 1 + long allJointDims = agg.getJointDimsMask(); + for (HierarchyMask hierarchyMask : agg.getHierarchyMasks()) { + long dim = hierarchyMask.allMasks[0]; + if ((dim & allJointDims) == 0) { + return cuboidID | dim; + } + } + } + } + + cuboidID = cuboidID | Collections.min(agg.getJoints(), Cuboid.cuboidSelectComparator); + Preconditions.checkState(agg.isOnTree(cuboidID)); + } + return cuboidID; + } + + private static long removeBits(long original, Collection<Long> toRemove) { + long ret = original; + for (Long joint : toRemove) { + ret = ret & ~joint; + } + return ret; + } + private long getOnTreeParent(long child) { Collection<Long> candidates = getOnTreeParents(child); if (candidates == null || candidates.isEmpty()) { @@ -257,4 +395,8 @@ public class DefaultCuboidScheduler extends CuboidScheduler { return parentCandidate; } + + public String getResponsibleKey() { + return CubeDesc.class.getName() + "-" + cubeDesc.getName(); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java index c7a4a05..ae38261 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; @@ -42,6 +43,7 @@ abstract public class AbstractInMemCubeBuilder { private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder.class); + final protected CuboidScheduler cuboidScheduler; final protected IJoinedFlatTableDesc flatDesc; final protected CubeDesc cubeDesc; final protected Map<TblColRef, Dictionary<String>> dictionaryMap; @@ -49,16 +51,23 @@ abstract public class AbstractInMemCubeBuilder { protected int taskThreadCount = 1; protected int reserveMemoryMB = 100; - public AbstractInMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { - if (flatDesc == null) + // @Deprecated + // public AbstractInMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { + // this(cubeDesc.getInitialCuboidScheduler(), cubeDesc, flatDesc, dictionaryMap); + // } + + protected AbstractInMemCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc, + Map<TblColRef, Dictionary<String>> dictionaryMap) { + if (cuboidScheduler == null) throw new NullPointerException(); - if (cubeDesc == null) + if (flatDesc == null) throw new NullPointerException(); if (dictionaryMap == null) throw new IllegalArgumentException("dictionary cannot be null"); + this.cuboidScheduler = cuboidScheduler; this.flatDesc = flatDesc; - this.cubeDesc = cubeDesc; + this.cubeDesc = cuboidScheduler.getCubeDesc(); this.dictionaryMap = dictionaryMap; } http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java index a9211da..dd92a2b 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java @@ -35,7 +35,7 @@ import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; -import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequestBuilder; import org.apache.kylin.gridtable.IGTScanner; @@ -57,8 +57,9 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { private int splitRowThreshold = Integer.MAX_VALUE; private int unitRows = 1000; - public DoggedCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { - super(cubeDesc, flatDesc, dictionaryMap); + public DoggedCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc, + Map<TblColRef, Dictionary<String>> dictionaryMap) { + super(cuboidScheduler, flatDesc, dictionaryMap); // check memory more often if a single row is big if (cubeDesc.hasMemoryHungryMeasures()) @@ -276,7 +277,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { RuntimeException exception; public SplitThread() { - this.builder = new InMemCubeBuilder(cubeDesc, flatDesc, dictionaryMap); + this.builder = new InMemCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap); this.builder.setConcurrentThreads(taskThreadCount); this.builder.setReserveMemoryMB(reserveMemoryMB); } http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index 93736c9..684c26b 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -39,7 +39,6 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.gridtable.CubeGridTable; import org.apache.kylin.cube.kv.CubeDimEncMap; -import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.gridtable.GTAggregateScanner; import org.apache.kylin.gridtable.GTBuilder; import org.apache.kylin.gridtable.GTInfo; @@ -71,7 +70,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { private static final double DERIVE_AGGR_CACHE_CONSTANT_FACTOR = 0.1; private static final double DERIVE_AGGR_CACHE_VARIABLE_FACTOR = 0.9; - private final CuboidScheduler cuboidScheduler; private final long baseCuboidId; private final int totalCuboidCount; private final String[] metricsAggrFuncs; @@ -90,9 +88,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { private Object[] totalSumForSanityCheck; private ICuboidCollector resultCollector; - public InMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { - super(cubeDesc, flatDesc, dictionaryMap); - this.cuboidScheduler = cubeDesc.getCuboidScheduler(); + public InMemCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc, + Map<TblColRef, Dictionary<String>> dictionaryMap) { + super(cuboidScheduler, flatDesc, dictionaryMap); this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); this.totalCuboidCount = cuboidScheduler.getCuboidCount(); @@ -109,8 +107,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { } private GridTable newGridTableByCuboidID(long cuboidID) throws IOException { - GTInfo info = CubeGridTable.newGTInfo( - Cuboid.findById(cubeDesc, cuboidID), + GTInfo info = CubeGridTable.newGTInfo(Cuboid.findById(cuboidScheduler, cuboidID), new CubeDimEncMap(cubeDesc, dictionaryMap) ); http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java index d5948d4..5a1f668 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java @@ -36,6 +36,7 @@ import org.apache.kylin.metadata.model.TblColRef; */ public class RowKeyDecoder { + private final CubeSegment cubeSegment; private final CubeDesc cubeDesc; private final RowKeyColumnIO colIO; private final RowKeySplitter rowKeySplitter; @@ -44,6 +45,7 @@ public class RowKeyDecoder { private List<String> values; public RowKeyDecoder(CubeSegment cubeSegment) { + this.cubeSegment = cubeSegment; this.cubeDesc = cubeSegment.getCubeDesc(); this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 255); this.colIO = new RowKeyColumnIO(cubeSegment.getDimensionEncodingMap()); @@ -73,7 +75,7 @@ public class RowKeyDecoder { if (this.cuboid != null && this.cuboid.getId() == cuboidID) { return; } - this.cuboid = Cuboid.findById(cubeDesc, cuboidID); + this.cuboid = Cuboid.findById(cubeSegment, cuboidID); } private void collectValue(TblColRef col, byte[] valueBytes, int length) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java index a669fb1..40cda76 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java @@ -18,7 +18,11 @@ package org.apache.kylin.cube.kv; -import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; @@ -28,10 +32,7 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.metadata.model.TblColRef; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; +import com.google.common.base.Preconditions; public class RowKeyEncoder extends AbstractRowKeyEncoder implements java.io.Serializable { http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 5d8503d..510a7d5 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -52,6 +52,7 @@ import org.apache.kylin.common.util.Array; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.cuboid.DefaultCuboidScheduler; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType; import org.apache.kylin.metadata.MetadataConstants; @@ -188,9 +189,6 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { @JsonInclude(JsonInclude.Include.NON_NULL) private int parentForward = 3; - // cuboid scheduler lazy built - transient private CuboidScheduler cuboidScheduler = null; - public boolean isEnableSharding() { //in the future may extend to other storage that is shard-able return storageType != IStorageAware.ID_HBASE && storageType != IStorageAware.ID_HYBRID; @@ -458,14 +456,6 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { return true; } - /** - * Get cuboid level count except base cuboid - * @return - */ - public int getBuildLevel() { - return getCuboidScheduler().getCuboidsByLayer().size() - 1; - } - @Override public int hashCode() { int result = 0; @@ -556,7 +546,6 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { hostToDerivedMap = Maps.newHashMap(); extendedColumnToHosts = Maps.newHashMap(); cuboidBlackSet = Sets.newHashSet(); - cuboidScheduler = null; } public void init(KylinConfig config) { @@ -612,16 +601,16 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { amendAllColumns(); } - public CuboidScheduler getCuboidScheduler() { - if (cuboidScheduler != null) - return cuboidScheduler; - - synchronized (this) { - if (cuboidScheduler == null) { - cuboidScheduler = CuboidScheduler.getInstance(this); - } - return cuboidScheduler; - } + public CuboidScheduler getInitialCuboidScheduler() { + return new DefaultCuboidScheduler(this); + } + + /** + * Get cuboid level count except base cuboid + * @return + */ + public int getInitialBuildLevel() { + return getInitialCuboidScheduler().getCuboidsByLayer().size() - 1; } public boolean isBlackedCuboid(long cuboidID) { @@ -1142,7 +1131,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { } public Set<Long> getAllCuboids() { - return getCuboidScheduler().getAllCuboidIds(); + return getInitialCuboidScheduler().getAllCuboidIds(); } public int getParentForward() { http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java index e829aeb..b16fce0 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java @@ -18,6 +18,8 @@ package org.apache.kylin.cube.model; +import java.util.List; + import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.FunctionDesc; @@ -26,8 +28,6 @@ import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; -import java.util.List; - /** * An enrich of IJoinedFlatTableDesc for cubes */ @@ -50,8 +50,7 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, java.io. // check what columns from hive tables are required, and index them private void parseCubeDesc() { - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc); // build index for rowkey columns List<TblColRef> cuboidColumns = baseCuboid.getColumns(); http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java index 1c3c395..ec4c04a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java @@ -57,7 +57,7 @@ public class CubingUtils { public static Map<Long, HLLCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn, Iterable<List<String>> streams) { final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc); final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length; - final Set<Long> allCuboidIds = cubeDesc.getCuboidScheduler().getAllCuboidIds(); + final Set<Long> allCuboidIds = cubeDesc.getInitialCuboidScheduler().getAllCuboidIds(); final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java index 58c0edb..3992ef9 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java @@ -115,7 +115,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { System.out.println("Spanning result for " + cuboidId + "(" + Long.toBinaryString(cuboidId) + "): " + toString(spannings)); for (long child : spannings) { - assertTrue(Cuboid.isValid(cube, child)); + assertTrue(Cuboid.isValid(scheduler, child)); } } @@ -128,7 +128,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testGetSpanningCuboid2() { CubeDesc cube = getTestKylinCubeWithSeller(); - CuboidScheduler scheduler = cube.getCuboidScheduler(); + CuboidScheduler scheduler = cube.getInitialCuboidScheduler(); // generate 8d System.out.println("Spanning for 8D Cuboids"); @@ -156,7 +156,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testGetSpanningCuboid1() { CubeDesc cube = getTestKylinCubeWithoutSeller(); - CuboidScheduler scheduler = cube.getCuboidScheduler(); + CuboidScheduler scheduler = cube.getInitialCuboidScheduler(); // generate 7d System.out.println("Spanning for 7D Cuboids"); @@ -228,7 +228,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testCuboidCounts1() { CubeDesc cube = getTestKylinCubeWithoutSeller(); - CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); + CuboidScheduler cuboidScheduler = cube.getInitialCuboidScheduler(); int[] counts = CuboidCLI.calculateAllLevelCount(cube); printCount(counts); int sum = 0; @@ -241,7 +241,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testCuboidCounts2() { CubeDesc cube = getTestKylinCubeWithoutSellerLeftJoin(); - CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); + CuboidScheduler cuboidScheduler = cube.getInitialCuboidScheduler(); int[] counts = CuboidCLI.calculateAllLevelCount(cube); printCount(counts); int sum = 0; @@ -254,7 +254,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testCuboidCounts3() { CubeDesc cube = getTestKylinCubeWithSeller(); - CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); + CuboidScheduler cuboidScheduler = cube.getInitialCuboidScheduler(); int[] counts = CuboidCLI.calculateAllLevelCount(cube); printCount(counts); int sum = 0; @@ -267,7 +267,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testCuboidCounts4() { CubeDesc cube = getTestKylinCubeWithSellerLeft(); - CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); + CuboidScheduler cuboidScheduler = cube.getInitialCuboidScheduler(); int[] counts = CuboidCLI.calculateAllLevelCount(cube); printCount(counts); int sum = 0; @@ -280,7 +280,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testCuboidCounts5() { CubeDesc cube = getStreamingCubeDesc(); - CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); + CuboidScheduler cuboidScheduler = cube.getInitialCuboidScheduler(); int[] counts = CuboidCLI.calculateAllLevelCount(cube); printCount(counts); int sum = 0; @@ -293,7 +293,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testCuboidCounts6() { CubeDesc cube = getCIInnerJoinCube(); - CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); + CuboidScheduler cuboidScheduler = cube.getInitialCuboidScheduler(); int[] counts = CuboidCLI.calculateAllLevelCount(cube); printCount(counts); int sum = 0; @@ -306,7 +306,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testLargeCube() { CubeDesc cube = getFiftyDimCubeDesc(); - CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); + CuboidScheduler cuboidScheduler = cube.getInitialCuboidScheduler(); long start = System.currentTimeMillis(); System.out.println(cuboidScheduler.getCuboidCount()); System.out.println("build tree takes: " + (System.currentTimeMillis() - start) + "ms"); @@ -317,7 +317,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { File twentyFile = new File(new File(LocalFileMetadataTestCase.LOCALMETA_TEMP_DATA, "cube_desc"), "twenty_dim"); twentyFile.renameTo(new File(twentyFile.getPath().substring(0, twentyFile.getPath().length() - 4))); CubeDesc cube = getTwentyDimCubeDesc(); - CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); + CuboidScheduler cuboidScheduler = cube.getInitialCuboidScheduler(); cuboidScheduler.getCuboidCount(); twentyFile.renameTo(new File(twentyFile.getPath() + ".bad")); } @@ -332,10 +332,10 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { } CubeDescManager.clearCache(); CubeDesc cube = getCubeDescManager().getCubeDesc("ut_large_dimension_number"); - CuboidScheduler scheduler = cube.getCuboidScheduler(); + CuboidScheduler scheduler = cube.getInitialCuboidScheduler(); Cuboid baseCuboid = Cuboid.getBaseCuboid(cube); - assertTrue(Cuboid.isValid(cube, baseCuboid.getId())); + assertTrue(Cuboid.isValid(scheduler, baseCuboid.getId())); List<Long> spanningChild = scheduler.getSpanningCuboid(baseCuboid.getId()); assertTrue(spanningChild.size() > 0); http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java index 2e64791..ab0c666 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java @@ -73,25 +73,26 @@ public class CuboidTest extends LocalFileMetadataTestCase { public void testIsValid() { CubeDesc cube = getTestKylinCubeWithSeller(); + CuboidScheduler cuboidScheduler = cube.getInitialCuboidScheduler(); // base - assertEquals(false, Cuboid.isValid(cube, 0)); - assertEquals(true, Cuboid.isValid(cube, toLong("111111111"))); + assertEquals(false, Cuboid.isValid(cuboidScheduler, 0)); + assertEquals(true, Cuboid.isValid(cuboidScheduler, toLong("111111111"))); // mandatory column - assertEquals(false, Cuboid.isValid(cube, toLong("011111110"))); - assertEquals(false, Cuboid.isValid(cube, toLong("100000000"))); + assertEquals(false, Cuboid.isValid(cuboidScheduler, toLong("011111110"))); + assertEquals(false, Cuboid.isValid(cuboidScheduler, toLong("100000000"))); // zero tail - assertEquals(true, Cuboid.isValid(cube, toLong("111111000"))); + assertEquals(true, Cuboid.isValid(cuboidScheduler, toLong("111111000"))); // aggregation group & zero tail - assertEquals(true, Cuboid.isValid(cube, toLong("110000111"))); - assertEquals(true, Cuboid.isValid(cube, toLong("110111000"))); - assertEquals(true, Cuboid.isValid(cube, toLong("111110111"))); - assertEquals(false, Cuboid.isValid(cube, toLong("111110001"))); - assertEquals(false, Cuboid.isValid(cube, toLong("111110100"))); - assertEquals(false, Cuboid.isValid(cube, toLong("110000100"))); + assertEquals(true, Cuboid.isValid(cuboidScheduler, toLong("110000111"))); + assertEquals(true, Cuboid.isValid(cuboidScheduler, toLong("110111000"))); + assertEquals(true, Cuboid.isValid(cuboidScheduler, toLong("111110111"))); + assertEquals(false, Cuboid.isValid(cuboidScheduler, toLong("111110001"))); + assertEquals(false, Cuboid.isValid(cuboidScheduler, toLong("111110100"))); + assertEquals(false, Cuboid.isValid(cuboidScheduler, toLong("110000100"))); } @Test @@ -124,41 +125,45 @@ public class CuboidTest extends LocalFileMetadataTestCase { @Test public void testIsValid2() { CubeDesc cube = getTestKylinCubeWithoutSeller(); - assertEquals(false, Cuboid.isValid(cube, toLong("111111111"))); + CuboidScheduler cuboidScheduler = cube.getInitialCuboidScheduler(); + + assertEquals(false, Cuboid.isValid(cuboidScheduler, toLong("111111111"))); // base - assertEquals(false, Cuboid.isValid(cube, 0)); - assertEquals(true, Cuboid.isValid(cube, toLong("11111111"))); + assertEquals(false, Cuboid.isValid(cuboidScheduler, 0)); + assertEquals(true, Cuboid.isValid(cuboidScheduler, toLong("11111111"))); // aggregation group & zero tail - assertEquals(true, Cuboid.isValid(cube, toLong("10000111"))); - assertEquals(false, Cuboid.isValid(cube, toLong("10001111"))); - assertEquals(false, Cuboid.isValid(cube, toLong("11001111"))); - assertEquals(true, Cuboid.isValid(cube, toLong("10000001"))); - assertEquals(true, Cuboid.isValid(cube, toLong("10000101"))); + assertEquals(true, Cuboid.isValid(cuboidScheduler, toLong("10000111"))); + assertEquals(false, Cuboid.isValid(cuboidScheduler, toLong("10001111"))); + assertEquals(false, Cuboid.isValid(cuboidScheduler, toLong("11001111"))); + assertEquals(true, Cuboid.isValid(cuboidScheduler, toLong("10000001"))); + assertEquals(true, Cuboid.isValid(cuboidScheduler, toLong("10000101"))); // hierarchy - assertEquals(true, Cuboid.isValid(cube, toLong("10100000"))); - assertEquals(true, Cuboid.isValid(cube, toLong("10110000"))); - assertEquals(true, Cuboid.isValid(cube, toLong("10111000"))); - assertEquals(false, Cuboid.isValid(cube, toLong("10001000"))); - assertEquals(false, Cuboid.isValid(cube, toLong("10011000"))); + assertEquals(true, Cuboid.isValid(cuboidScheduler, toLong("10100000"))); + assertEquals(true, Cuboid.isValid(cuboidScheduler, toLong("10110000"))); + assertEquals(true, Cuboid.isValid(cuboidScheduler, toLong("10111000"))); + assertEquals(false, Cuboid.isValid(cuboidScheduler, toLong("10001000"))); + assertEquals(false, Cuboid.isValid(cuboidScheduler, toLong("10011000"))); } @Test public void testIsValid3() { CubeDesc cube = getSSBCubeDesc(); + CuboidScheduler cuboidScheduler = cube.getInitialCuboidScheduler(); - assertEquals(false, Cuboid.isValid(cube, toLong("10000000000"))); + assertEquals(false, Cuboid.isValid(cuboidScheduler, toLong("10000000000"))); // the 4th is mandatory and isMandatoryOnlyValid is true - assertEquals(true, Cuboid.isValid(cube, toLong("10000001000"))); - assertEquals(true, Cuboid.isValid(cube, toLong("00000001000"))); + assertEquals(true, Cuboid.isValid(cuboidScheduler, toLong("10000001000"))); + assertEquals(true, Cuboid.isValid(cuboidScheduler, toLong("00000001000"))); } @Test public void testFindCuboidByIdWithSingleAggrGroup2() { CubeDesc cube = getTestKylinCubeWithSeller(); + CuboidScheduler cuboidScheduler = cube.getInitialCuboidScheduler(); Cuboid cuboid; cuboid = Cuboid.findById(cube, 0); @@ -186,6 +191,7 @@ public class CuboidTest extends LocalFileMetadataTestCase { @Test public void testFindCuboidByIdWithMultiAggrGroup() { CubeDesc cube = getTestKylinCubeWithoutSellerLeftJoin(); + CuboidScheduler cuboidScheduler = cube.getInitialCuboidScheduler(); Cuboid cuboid; cuboid = Cuboid.findById(cube, toLong("111111110")); http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java index 1d1d147..ec1f221 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyDecoderTest.java @@ -91,7 +91,7 @@ public class RowKeyDecoderTest extends LocalFileMetadataTestCase { data[7] = "15"; long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + Cuboid baseCuboid = Cuboid.findById(cube, baseCuboidId); RowKeyEncoder rowKeyEncoder = new RowKeyEncoder(cube.getFirstSegment(), baseCuboid); byte[] encodedKey = rowKeyEncoder.encode(data); http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java index 75e2458..5af8d8a 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowKeyEncoderTest.java @@ -66,7 +66,7 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase { data[7] = "15"; long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + Cuboid baseCuboid = Cuboid.findById(cube, baseCuboidId); RowKeyEncoder rowKeyEncoder = new RowKeyEncoder(cube.getFirstSegment(), baseCuboid); byte[] encodedKey = rowKeyEncoder.encode(data); @@ -97,7 +97,7 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase { data[8] = "15"; long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + Cuboid baseCuboid = Cuboid.findById(cube, baseCuboidId); RowKeyEncoder rowKeyEncoder = new RowKeyEncoder(cube.getFirstSegment(), baseCuboid); byte[] encodedKey = rowKeyEncoder.encode(data); @@ -133,7 +133,7 @@ public class RowKeyEncoderTest extends LocalFileMetadataTestCase { data[8] = null; long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + Cuboid baseCuboid = Cuboid.findById(cube, baseCuboidId); RowKeyEncoder rowKeyEncoder = new RowKeyEncoder(cube.getFirstSegment(), baseCuboid); byte[] encodedKey = rowKeyEncoder.encode(data); http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index b95253d..d7a5652 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -129,7 +129,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>(); dimensionsD.addAll(groupsD); dimensionsD.addAll(otherDimsD); - Cuboid cuboid = findCuboid(cubeDesc, dimensionsD, metrics); + Cuboid cuboid = findCuboid(cubeInstance, dimensionsD, metrics); context.setCuboid(cuboid); // set whether to aggr at storage @@ -160,8 +160,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { protected abstract String getGTStorage(); - protected Cuboid findCuboid(CubeDesc cubeDesc, Set<TblColRef> dimensionsD, Set<FunctionDesc> metrics) { - return Cuboid.identifyCuboid(cubeDesc, dimensionsD, metrics); + protected Cuboid findCuboid(CubeInstance cubeInstance, Set<TblColRef> dimensionsD, Set<FunctionDesc> metrics) { + return Cuboid.identifyCuboid(cubeInstance, dimensionsD, metrics); } protected ITupleConverter newCubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, int[] gtColIdx, TupleInfo tupleInfo) { http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java index bfddb1f..f49b728 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java @@ -84,7 +84,7 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> { public HBaseKeyRange(Collection<TblColRef> dimensionColumns, Collection<ColumnValueRange> andDimensionRanges, CubeSegment cubeSeg, CubeDesc cubeDesc) { this.cubeSeg = cubeSeg; long cuboidId = this.calculateCuboidID(cubeDesc, dimensionColumns); - this.cuboid = Cuboid.findById(cubeDesc, cuboidId); + this.cuboid = Cuboid.findById(cubeSeg, cuboidId); this.flatOrAndFilter = Lists.newLinkedList(); this.flatOrAndFilter.add(andDimensionRanges); init(andDimensionRanges); http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java index 1ec23b6..f64365a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java @@ -64,7 +64,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { // Phase 3: Build Cube RowKeyDesc rowKeyDesc = seg.getCubeDesc().getRowkey(); - final int groupRowkeyColumnsCount = seg.getCubeDesc().getBuildLevel(); + final int groupRowkeyColumnsCount = seg.getCuboidScheduler().getBuildLevel(); // base cuboid step result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId)); // n dim cuboid steps http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 106077c..51c9056 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -76,7 +76,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { } protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { - final int maxLevel = seg.getCubeDesc().getBuildLevel(); + final int maxLevel = seg.getCuboidScheduler().getBuildLevel(); // base cuboid step result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId)); // n dim cuboid steps http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java index 07b636b..40f1ac5 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java @@ -18,7 +18,11 @@ package org.apache.kylin.engine.mr.common; -import com.google.common.collect.Sets; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeSegment; @@ -35,10 +39,7 @@ import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.Set; +import com.google.common.collect.Sets; /** */ @@ -88,8 +89,7 @@ public class BaseCuboidBuilder implements java.io.Serializable { } private void init() { - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + baseCuboid = Cuboid.getBaseCuboid(cubeDesc); initNullBytes(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index e160d27..417697d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -81,7 +81,7 @@ public class CubeStatsReader { public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException { ResourceStore store = ResourceStore.getStore(kylinConfig); - cuboidScheduler = cubeSegment.getCubeDesc().getCuboidScheduler(); + cuboidScheduler = cubeSegment.getCuboidScheduler(); String statsKey = cubeSegment.getStatisticsResourcePath(); File tmpSeqFile = writeTmpSeqFile(store.getResource(statsKey).inputStream); Reader reader = null; @@ -174,8 +174,7 @@ public class CubeStatsReader { public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap) { final CubeDesc cubeDesc = cubeSegment.getCubeDesc(); final List<Integer> rowkeyColumnSize = Lists.newArrayList(); - final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + final Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc); final List<TblColRef> columnList = baseCuboid.getColumns(); final CubeDimEncMap dimEncMap = cubeSegment.getDimensionEncodingMap(); @@ -185,7 +184,7 @@ public class CubeStatsReader { Map<Long, Double> sizeMap = Maps.newHashMap(); for (Map.Entry<Long, Long> entry : rowCountMap.entrySet()) { - sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), baseCuboidId, rowkeyColumnSize)); + sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), baseCuboid.getId(), rowkeyColumnSize)); } return sizeMap; } http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java index 8281759..16bc03c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -88,7 +88,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED)); if (collectStatistics) { samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); - cuboidScheduler = cubeDesc.getCuboidScheduler(); + cuboidScheduler = cubeDesc.getInitialCuboidScheduler(); nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; List<Long> cuboidIdList = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java index eee189c..5511414 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java @@ -96,7 +96,8 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr } int taskCount = config.getCubeAlgorithmInMemConcurrentThreads(); - DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap); + DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cubeSegment.getCuboidScheduler(), flatDesc, + dictionaryMap); cubeBuilder.setReserveMemoryMB(calculateReserveMB(context.getConfiguration())); cubeBuilder.setConcurrentThreads(taskCount); http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java index aa323fd..2058bc9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java @@ -95,7 +95,7 @@ public abstract class KVGTRecordWriter implements ICuboidWriter { protected abstract void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException; private void initVariables(Long cuboidId) { - rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc, cuboidId)); + rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeSegment, cuboidId)); keyBuf = rowKeyEncoder.createBuf(); dimensions = Long.bitCount(cuboidId); http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index a01a928..c80283e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -150,7 +150,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { @Override public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { long cuboidID = rowKeySplitter.split(key.getBytes()); - Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID); + Cuboid cuboid = Cuboid.findById(cube, cuboidID); RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid); SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers(); http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index 782ce72..6680fd7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -75,7 +75,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { cubeDesc = cube.getDescriptor(); ndCuboidBuilder = new NDCuboidBuilder(cubeSegment); // initialize CubiodScheduler - cuboidScheduler = cubeDesc.getCuboidScheduler(); + cuboidScheduler = cubeSegment.getCuboidScheduler(); rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); } @@ -84,7 +84,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { @Override public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { long cuboidId = rowKeySplitter.split(key.getBytes()); - Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId); + Cuboid parentCuboid = Cuboid.findById(cuboidScheduler, cuboidId); Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId); @@ -104,7 +104,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { } for (Long child : myChildren) { - Cuboid childCuboid = Cuboid.findById(cubeDesc, child); + Cuboid childCuboid = Cuboid.findById(cuboidScheduler, child); Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); outputKey.set(result.getSecond().array(), 0, result.getFirst()); context.write(outputKey, value); http://git-wip-us.apache.org/repos/asf/kylin/blob/721c64dd/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index a094cc2..fd53c5a 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -189,7 +189,7 @@ public class SparkCubing extends AbstractApplication { final CubeDesc cubeDesc = cubeInstance.getDescriptor(); final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap(); final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc); - final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns(); + final List<TblColRef> baseCuboidColumn = Cuboid.findById(seg, Cuboid.getBaseCuboidId(cubeDesc)).getColumns(); final long start = System.currentTimeMillis(); final RowKeyDesc rowKey = cubeDesc.getRowkey(); for (int i = 0; i < baseCuboidColumn.size(); i++) { @@ -253,7 +253,7 @@ public class SparkCubing extends AbstractApplication { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); CubeDesc cubeDesc = cubeInstance.getDescriptor(); - CuboidScheduler cuboidScheduler = cubeDesc.getCuboidScheduler(); + CuboidScheduler cuboidScheduler = cubeInstance.getCuboidScheduler(); Set<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds(); final HashMap<Long, HLLCounter> zeroValue = Maps.newHashMap(); for (Long id : allCuboidIds) { @@ -335,7 +335,7 @@ public class SparkCubing extends AbstractApplication { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); CubeDesc cubeDesc = cubeInstance.getDescriptor(); final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); - List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns(); + List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeSegment, Cuboid.getBaseCuboidId(cubeDesc)).getColumns(); final Map<TblColRef, Integer> columnLengthMap = Maps.newHashMap(); final CubeDimEncMap dimEncMap = cubeSegment.getDimensionEncodingMap(); for (TblColRef tblColRef : baseCuboidColumn) { @@ -377,7 +377,8 @@ public class SparkCubing extends AbstractApplication { LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue(); System.out.println("load properties finished"); IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment); - AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap); + AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder( + cubeSegment.getCuboidScheduler(), flatDesc, dictionaryMap); final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap)); Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter)); try {