KYLIN-3092 Synchronize w/r operations on entity-caching managers
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f2f487fe Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f2f487fe Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f2f487fe Branch: refs/heads/master Commit: f2f487fe2dc868119562303cb8b9a0b630f704cf Parents: b8d7987 Author: Li Yang <liy...@apache.org> Authored: Sat Dec 9 00:23:51 2017 +0800 Committer: Dong Li <lid...@apache.org> Committed: Mon Dec 11 10:16:35 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/CubeDescManager.java | 311 +++-- .../org/apache/kylin/cube/CubeInstance.java | 31 +- .../java/org/apache/kylin/cube/CubeManager.java | 1079 +++++++++--------- .../kylin/cube/cli/CubeSignatureRefresher.java | 2 +- .../org/apache/kylin/cube/model/CubeDesc.java | 29 +- .../kylin/metadata/TableMetadataManager.java | 583 +++++----- .../kylin/metadata/TempStatementManager.java | 157 ++- .../org/apache/kylin/metadata/acl/TableACL.java | 11 + .../kylin/metadata/acl/TableACLManager.java | 111 +- .../metadata/cachesync/CachedCrudAssist.java | 72 +- .../kylin/metadata/model/DataModelDesc.java | 7 +- .../kylin/metadata/model/DataModelManager.java | 269 +++-- .../metadata/model/ExternalFilterDesc.java | 5 + .../apache/kylin/metadata/model/TableDesc.java | 79 +- .../kylin/metadata/model/TableExtDesc.java | 38 +- .../kylin/metadata/project/ProjectInstance.java | 2 +- .../kylin/metadata/project/ProjectManager.java | 12 +- .../metadata/streaming/StreamingConfig.java | 6 + .../metadata/streaming/StreamingManager.java | 183 +-- .../metadata/TempStatementManagerTest.java | 6 +- .../streaming/StreamingManagerTest.java | 68 ++ .../kylin/storage/hybrid/HybridInstance.java | 37 +- .../kylin/storage/hybrid/HybridManager.java | 133 ++- .../apache/kylin/engine/spark/SparkCubing.java | 4 +- .../DEFAULT_SESSION/temp_table1.json | 1 + .../DEFAULT_SESSION/temp_table2.json | 1 + .../kylin/provision/BuildCubeWithStream.java | 2 +- .../kylin/rest/controller/ModelController.java | 2 +- .../kylin/rest/service/KafkaConfigService.java | 2 +- .../kylin/rest/service/StreamingService.java | 4 +- .../kylin/source/kafka/KafkaConfigManager.java | 166 +-- .../kylin/source/kafka/config/KafkaConfig.java | 8 +- 32 files changed, 1746 insertions(+), 1675 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java index f724549..a58ba40 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java @@ -25,9 +25,9 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.common.util.AutoReadWriteLock; +import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock; import org.apache.kylin.cube.cuboid.CuboidManager; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.validation.CubeMetadataValidator; @@ -36,9 +36,9 @@ import org.apache.kylin.dimension.DictionaryDimEnc; import org.apache.kylin.dimension.DimensionEncoding; import org.apache.kylin.dimension.DimensionEncodingFactory; import org.apache.kylin.measure.topn.TopNMeasureType; -import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.cachesync.Broadcaster.Event; +import org.apache.kylin.metadata.cachesync.CachedCrudAssist; import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.MeasureDesc; @@ -59,8 +59,6 @@ public class CubeDescManager { private static final Logger logger = LoggerFactory.getLogger(CubeDescManager.class); - public static final Serializer<CubeDesc> CUBE_DESC_SERIALIZER = new JsonSerializer<CubeDesc>(CubeDesc.class); - public static CubeDescManager getInstance(KylinConfig config) { return config.getManager(CubeDescManager.class); } @@ -69,20 +67,42 @@ public class CubeDescManager { static CubeDescManager newInstance(KylinConfig config) throws IOException { return new CubeDescManager(config); } - + // ============================================================================ private KylinConfig config; + // name ==> CubeDesc private CaseInsensitiveStringCache<CubeDesc> cubeDescMap; + private CachedCrudAssist<CubeDesc> crud; + + // protects concurrent operations around the cached map, to avoid for example + // writing an entity in the middle of reloading it (dirty read) + private AutoReadWriteLock descMapLock = new AutoReadWriteLock(); - private CubeDescManager(KylinConfig config) throws IOException { - logger.info("Initializing CubeDescManager with config " + config); - this.config = config; + private CubeDescManager(KylinConfig cfg) throws IOException { + logger.info("Initializing CubeDescManager with config " + cfg); + this.config = cfg; this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, "cube_desc"); + this.crud = new CachedCrudAssist<CubeDesc>(getStore(), ResourceStore.CUBE_DESC_RESOURCE_ROOT, CubeDesc.class, + cubeDescMap) { + @Override + protected CubeDesc initEntityAfterReload(CubeDesc cubeDesc, String resourceName) { + if (cubeDesc.isDraft()) + throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' must not be a draft"); + + try { + cubeDesc.init(config); + } catch (Exception e) { + logger.warn("Broken cube desc " + cubeDesc.resourceName(), e); + cubeDesc.addError(e.getMessage()); + } + return cubeDesc; + } + }; // touch lower level metadata before registering my listener - reloadAllCubeDesc(); + crud.reloadAll(); Broadcaster.getInstance(config).registerListener(new CubeDescSyncListener(), "cube_desc"); } @@ -93,7 +113,7 @@ public class CubeDescManager { for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) { if (real instanceof CubeInstance) { String descName = ((CubeInstance) real).getDescName(); - reloadCubeDescLocal(descName); + reloadCubeDescQuietly(descName); } } } @@ -108,7 +128,7 @@ public class CubeDescManager { if (event == Event.DROP) removeLocalCubeDesc(cubeDescName); else - reloadCubeDescLocal(cubeDescName); + reloadCubeDescQuietly(cubeDescName); for (ProjectInstance prj : ProjectManager.getInstance(config).findProjectsByModel(modelName)) { broadcaster.notifyProjectSchemaUpdate(prj.getName()); @@ -117,58 +137,45 @@ public class CubeDescManager { } public CubeDesc getCubeDesc(String name) { - return cubeDescMap.get(name); + try (AutoLock lock = descMapLock.lockForRead()) { + return cubeDescMap.get(name); + } } public List<CubeDesc> listAllDesc() { - return new ArrayList<CubeDesc>(cubeDescMap.values()); - } - - /** - * Reload CubeDesc from resource store It will be triggered by an desc - * update event. - * - * @param name - * @throws IOException - */ - public CubeDesc reloadCubeDescLocal(String name) throws IOException { - // Broken CubeDesc is not allowed to be saved and broadcast. - CubeDesc ndesc = loadCubeDesc(CubeDesc.concatResourcePath(name), false); - - cubeDescMap.putLocal(ndesc.getName(), ndesc); - clearCuboidCache(ndesc.getName()); - - // if related cube is in DESCBROKEN state before, change it back to DISABLED - CubeManager cubeManager = CubeManager.getInstance(config); - for (CubeInstance cube : cubeManager.getCubesByDesc(name)) { - if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) { - cubeManager.reloadCubeLocal(cube.getName()); - } + try (AutoLock lock = descMapLock.lockForRead()) { + return new ArrayList<CubeDesc>(cubeDescMap.values()); } - - return ndesc; } - - private CubeDesc loadCubeDesc(String path, boolean allowBroken) throws IOException { - ResourceStore store = getStore(); - CubeDesc ndesc = store.getResource(path, CubeDesc.class, CUBE_DESC_SERIALIZER); - if (ndesc == null) - throw new IllegalArgumentException("No cube desc found at " + path); - if (ndesc.isDraft()) - throw new IllegalArgumentException("CubeDesc '" + ndesc.getName() + "' must not be a draft"); - - try { - ndesc.init(config); + + public CubeDesc reloadCubeDescQuietly(String name) { + try (AutoLock lock = descMapLock.lockForWrite()) { + return reloadCubeDescLocal(name); } catch (Exception e) { - logger.warn("Broken cube desc " + path, e); - ndesc.addError(e.getMessage()); + logger.error("Failed to reload CubeDesc " + name, e); + return null; } + } - if (!allowBroken && !ndesc.getError().isEmpty()) { - throw new IllegalStateException("Cube desc at " + path + " has issues: " + ndesc.getError()); + public CubeDesc reloadCubeDescLocal(String name) throws IOException { + try (AutoLock lock = descMapLock.lockForWrite()) { + CubeDesc ndesc = crud.reload(name); + clearCuboidCache(name); + + // Broken CubeDesc is not allowed to be saved and broadcast. + if (ndesc.isBroken()) + throw new IllegalStateException("CubeDesc " + name + " is broken"); + + // if related cube is in DESCBROKEN state before, change it back to DISABLED + CubeManager cubeManager = CubeManager.getInstance(config); + for (CubeInstance cube : cubeManager.getCubesByDesc(name)) { + if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) { + cube.init(config); + } + } + + return ndesc; } - - return ndesc; } /** @@ -179,38 +186,83 @@ public class CubeDescManager { * @throws IOException */ public CubeDesc createCubeDesc(CubeDesc cubeDesc) throws IOException { - if (cubeDesc.getUuid() == null || cubeDesc.getName() == null) - throw new IllegalArgumentException(); - if (cubeDescMap.containsKey(cubeDesc.getName())) - throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' already exists"); - if (cubeDesc.isDraft()) - throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' must not be a draft"); - - try { - cubeDesc.init(config); - } catch (Exception e) { - logger.warn("Broken cube desc " + cubeDesc, e); - cubeDesc.addError(e.getMessage()); - } - postProcessCubeDesc(cubeDesc); - // Check base validation - if (!cubeDesc.getError().isEmpty()) { - return cubeDesc; - } - // Semantic validation - CubeMetadataValidator validator = new CubeMetadataValidator(); - ValidateContext context = validator.validate(cubeDesc); - if (!context.ifPass()) { + try (AutoLock lock = descMapLock.lockForWrite()) { + if (cubeDesc.getUuid() == null || cubeDesc.getName() == null) + throw new IllegalArgumentException(); + if (cubeDescMap.containsKey(cubeDesc.getName())) + throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' already exists"); + if (cubeDesc.isDraft()) + throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' must not be a draft"); + + try { + cubeDesc.init(config); + } catch (Exception e) { + logger.warn("Broken cube desc " + cubeDesc, e); + cubeDesc.addError(e.getMessage()); + } + + postProcessCubeDesc(cubeDesc); + // Check base validation + if (!cubeDesc.getError().isEmpty()) { + return cubeDesc; + } + // Semantic validation + CubeMetadataValidator validator = new CubeMetadataValidator(); + ValidateContext context = validator.validate(cubeDesc); + if (!context.ifPass()) { + return cubeDesc; + } + + cubeDesc.setSignature(cubeDesc.calculateSignature()); + + // save resource + crud.save(cubeDesc); + return cubeDesc; } + } + + /** + * Update CubeDesc with the input. Broadcast the event into cluster + * + * @param desc + * @return + * @throws IOException + */ + public CubeDesc updateCubeDesc(CubeDesc desc) throws IOException { + try (AutoLock lock = descMapLock.lockForWrite()) { + // Validate CubeDesc + if (desc.getUuid() == null || desc.getName() == null) + throw new IllegalArgumentException(); + String name = desc.getName(); + if (!cubeDescMap.containsKey(name)) + throw new IllegalArgumentException("CubeDesc '" + name + "' does not exist."); + if (desc.isDraft()) + throw new IllegalArgumentException("CubeDesc '" + desc.getName() + "' must not be a draft"); + + try { + desc.init(config); + } catch (Exception e) { + logger.warn("Broken cube desc " + desc, e); + desc.addError(e.getMessage()); + return desc; + } + + postProcessCubeDesc(desc); + // Semantic validation + CubeMetadataValidator validator = new CubeMetadataValidator(); + ValidateContext context = validator.validate(desc); + if (!context.ifPass()) { + return desc; + } - cubeDesc.setSignature(cubeDesc.calculateSignature()); + desc.setSignature(desc.calculateSignature()); - String path = cubeDesc.getResourcePath(); - getStore().putResource(path, cubeDesc, CUBE_DESC_SERIALIZER); - cubeDescMap.put(cubeDesc.getName(), cubeDesc); + // save resource + crud.save(desc); - return cubeDesc; + return desc; + } } /** @@ -259,16 +311,18 @@ public class CubeDescManager { // remove cubeDesc public void removeCubeDesc(CubeDesc cubeDesc) throws IOException { - String path = cubeDesc.getResourcePath(); - getStore().deleteResource(path); - cubeDescMap.remove(cubeDesc.getName()); - clearCuboidCache(cubeDesc.getName()); + try (AutoLock lock = descMapLock.lockForWrite()) { + crud.delete(cubeDesc); + clearCuboidCache(cubeDesc.getName()); + } } // remove cubeDesc public void removeLocalCubeDesc(String name) throws IOException { - cubeDescMap.removeLocal(name); - clearCuboidCache(name); + try (AutoLock lock = descMapLock.lockForWrite()) { + cubeDescMap.removeLocal(name); + clearCuboidCache(name); + } } private void clearCuboidCache(String descName) { @@ -276,87 +330,6 @@ public class CubeDescManager { CuboidManager.getInstance(config).clearCache(descName); } - private void reloadAllCubeDesc() throws IOException { - ResourceStore store = getStore(); - logger.info("Reloading Cube Metadata from folder " - + store.getReadableResourcePath(ResourceStore.CUBE_DESC_RESOURCE_ROOT)); - - cubeDescMap.clear(); - - List<String> paths = store.collectResourceRecursively(ResourceStore.CUBE_DESC_RESOURCE_ROOT, - MetadataConstants.FILE_SURFIX); - for (String path : paths) { - CubeDesc desc = null; - try { - desc = loadCubeDesc(path, true); - } catch (Exception e) { - logger.error("Error during load cube desc, skipping " + path, e); - continue; - } - - if (!path.equals(desc.getResourcePath())) { - logger.error( - "Skip suspicious desc at " + path + ", " + desc + " should be at " + desc.getResourcePath()); - continue; - } - if (cubeDescMap.containsKey(desc.getName())) { - logger.error("Dup CubeDesc name '" + desc.getName() + "' on path " + path); - continue; - } - - cubeDescMap.putLocal(desc.getName(), desc); - } - - logger.info("Loaded " + cubeDescMap.size() + " Cube Desc(s)"); - } - - /** - * Update CubeDesc with the input. Broadcast the event into cluster - * - * @param desc - * @return - * @throws IOException - */ - public CubeDesc updateCubeDesc(CubeDesc desc) throws IOException { - // Validate CubeDesc - if (desc.getUuid() == null || desc.getName() == null) - throw new IllegalArgumentException(); - String name = desc.getName(); - if (!cubeDescMap.containsKey(name)) - throw new IllegalArgumentException("CubeDesc '" + name + "' does not exist."); - if (desc.isDraft()) - throw new IllegalArgumentException("CubeDesc '" + desc.getName() + "' must not be a draft"); - - try { - desc.init(config); - } catch (Exception e) { - logger.warn("Broken cube desc " + desc, e); - desc.addError(e.getMessage()); - return desc; - } - - postProcessCubeDesc(desc); - // Semantic validation - CubeMetadataValidator validator = new CubeMetadataValidator(); - ValidateContext context = validator.validate(desc); - if (!context.ifPass()) { - return desc; - } - - desc.setSignature(desc.calculateSignature()); - - // Save Source - String path = desc.getResourcePath(); - getStore().putResource(path, desc, CUBE_DESC_SERIALIZER); - - // Reload the CubeDesc - CubeDesc ndesc = loadCubeDesc(path, false); - // Here replace the old one - cubeDescMap.put(ndesc.getName(), desc); - - return ndesc; - } - private ResourceStore getStore() { return ResourceStore.getStore(this.config); } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index d1c5166..1be7923 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -18,6 +18,8 @@ package org.apache.kylin.cube; +import static com.google.common.base.Preconditions.checkNotNull; + import java.io.IOException; import java.util.List; import java.util.Map; @@ -50,6 +52,8 @@ import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.metadata.realization.SQLDigest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; @@ -63,6 +67,8 @@ import com.google.common.collect.Lists; @SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) public class CubeInstance extends RootPersistentEntity implements IRealization, IBuildable { + private static final Logger logger = LoggerFactory.getLogger(CubeInstance.class); + public static final int COST_WEIGHT_MEASURE = 1; public static final int COST_WEIGHT_DIMENSION = 10; public static final int COST_WEIGHT_INNER_JOIN = 100; @@ -121,6 +127,24 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, // default constructor for jackson public CubeInstance() { } + + void init(KylinConfig config) { + CubeDesc cubeDesc = CubeDescManager.getInstance(config).getCubeDesc(descName); + checkNotNull(cubeDesc, "cube descriptor '%s' (for cube '%s') not found", descName, name); + + if (cubeDesc.isBroken()) { + setStatus(RealizationStatusEnum.DESCBROKEN); + logger.error("cube descriptor {} (for cube '{}') is broken", cubeDesc.getResourcePath(), name); + for (String error : cubeDesc.getError()) { + logger.error("Error: {}", error); + } + } else if (getStatus() == RealizationStatusEnum.DESCBROKEN) { + setStatus(RealizationStatusEnum.DISABLED); + logger.info("cube {} changed from DESCBROKEN to DISABLED", name); + } + + setConfig((KylinConfigExt) cubeDesc.getConfig()); + } public CuboidScheduler getCuboidScheduler() { if (cuboidScheduler != null) @@ -174,9 +198,14 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, return (getStatus() == RealizationStatusEnum.DISABLED || getStatus() == RealizationStatusEnum.DESCBROKEN) && segments.isEmpty(); } + + @Override + public String resourceName() { + return name; + } public String getResourcePath() { - return concatResourcePath(name); + return concatResourcePath(resourceName()); } public static String concatResourcePath(String cubeName) { http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index e00735c..3220a0f 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -18,13 +18,9 @@ package org.apache.kylin.cube; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -37,10 +33,11 @@ import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.common.util.AutoReadWriteLock; +import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.cuboid.Cuboid; @@ -54,6 +51,7 @@ import org.apache.kylin.dict.lookup.SnapshotTable; import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.cachesync.Broadcaster.Event; +import org.apache.kylin.metadata.cachesync.CachedCrudAssist; import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.JoinDesc; @@ -87,7 +85,7 @@ public class CubeManager implements IRealizationProvider { private static String ALPHA_NUM = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; private static int HBASE_TABLE_LENGTH = 10; - public static final Serializer<CubeInstance> CUBE_SERIALIZER = new JsonSerializer<CubeInstance>(CubeInstance.class); + public static final Serializer<CubeInstance> CUBE_SERIALIZER = new JsonSerializer<>(CubeInstance.class); private static final Logger logger = LoggerFactory.getLogger(CubeManager.class); @@ -103,21 +101,41 @@ public class CubeManager implements IRealizationProvider { // ============================================================================ private KylinConfig config; + // cube name ==> CubeInstance private CaseInsensitiveStringCache<CubeInstance> cubeMap; - // "table/column" ==> lookup table - // private SingleValueCache<String, LookupStringTable> lookupTables = new SingleValueCache<String, LookupStringTable>(Broadcaster.TYPE.METADATA); + private CachedCrudAssist<CubeInstance> crud; + + // protects concurrent operations around the cached map, to avoid for example + // writing an entity in the middle of reloading it (dirty read) + private AutoReadWriteLock cubeMapLock = new AutoReadWriteLock(); // for generation hbase table name of a new segment private ConcurrentMap<String, String> usedStorageLocation = new ConcurrentHashMap<>(); - private CubeManager(KylinConfig config) throws IOException { + // a few inner classes to group related methods + private SegmentAssist segAssist = new SegmentAssist(); + private DictionaryAssist dictAssist = new DictionaryAssist(); + + private CubeManager(KylinConfig cfg) throws IOException { logger.info("Initializing CubeManager with config " + config); - this.config = config; + this.config = cfg; this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, "cube"); + this.crud = new CachedCrudAssist<CubeInstance>(getStore(), ResourceStore.CUBE_RESOURCE_ROOT, CubeInstance.class, + cubeMap) { + @Override + protected CubeInstance initEntityAfterReload(CubeInstance cube, String resourceName) { + cube.init(config); + + for (CubeSegment segment : cube.getSegments()) { + usedStorageLocation.put(segment.getUuid(), segment.getStorageLocationIdentifier()); + } + return cube; + } + }; // touch lower level metadata before registering my listener - loadAllCubeInstance(); + crud.reloadAll(); Broadcaster.getInstance(config).registerListener(new CubeSyncListener(), "cube"); } @@ -127,7 +145,7 @@ public class CubeManager implements IRealizationProvider { public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException { for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) { if (real instanceof CubeInstance) { - reloadCubeLocal(real.getName()); + reloadCubeQuietly(real.getName()); } } } @@ -140,7 +158,7 @@ public class CubeManager implements IRealizationProvider { if (event == Event.DROP) removeCubeLocal(cubeName); else - reloadCubeLocal(cubeName); + reloadCubeQuietly(cubeName); for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.CUBE, cubeName)) { @@ -150,20 +168,25 @@ public class CubeManager implements IRealizationProvider { } public List<CubeInstance> listAllCubes() { - return new ArrayList<CubeInstance>(cubeMap.values()); + try (AutoLock lock = cubeMapLock.lockForRead()) { + return new ArrayList<CubeInstance>(cubeMap.values()); + } } public CubeInstance getCube(String cubeName) { - return cubeMap.get(cubeName); + try (AutoLock lock = cubeMapLock.lockForRead()) { + return cubeMap.get(cubeName); + } } public CubeInstance getCubeByUuid(String uuid) { - Collection<CubeInstance> copy = new ArrayList<CubeInstance>(cubeMap.values()); - for (CubeInstance cube : copy) { - if (uuid.equals(cube.getUuid())) - return cube; + try (AutoLock lock = cubeMapLock.lockForRead()) { + for (CubeInstance cube : cubeMap.values()) { + if (uuid.equals(cube.getUuid())) + return cube; + } + return null; } - return null; } /** @@ -174,148 +197,57 @@ public class CubeManager implements IRealizationProvider { * @return */ public List<CubeInstance> getCubesByDesc(String descName) { - - List<CubeInstance> list = listAllCubes(); - List<CubeInstance> result = new ArrayList<CubeInstance>(); - Iterator<CubeInstance> it = list.iterator(); - while (it.hasNext()) { - CubeInstance ci = it.next(); - if (descName.equalsIgnoreCase(ci.getDescName())) { - result.add(ci); + try (AutoLock lock = cubeMapLock.lockForRead()) { + List<CubeInstance> list = listAllCubes(); + List<CubeInstance> result = new ArrayList<CubeInstance>(); + Iterator<CubeInstance> it = list.iterator(); + while (it.hasNext()) { + CubeInstance ci = it.next(); + if (descName.equalsIgnoreCase(ci.getDescName())) { + result.add(ci); + } } + return result; } - return result; - } - - public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable) - throws IOException { - CubeDesc cubeDesc = cubeSeg.getCubeDesc(); - if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) - return null; - - String builderClass = cubeDesc.getDictionaryBuilderClass(col); - DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass); - - saveDictionaryInfo(cubeSeg, col, dictInfo); - return dictInfo; } - public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable, - Dictionary<String> dict) throws IOException { - CubeDesc cubeDesc = cubeSeg.getCubeDesc(); - if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) - return null; - - DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict); - - saveDictionaryInfo(cubeSeg, col, dictInfo); - return dictInfo; - } - - private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo) throws IOException { - if (dictInfo != null) { - Dictionary<?> dict = dictInfo.getDictionaryObject(); - cubeSeg.putDictResPath(col, dictInfo.getResourcePath()); - cubeSeg.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() }); - - CubeUpdate update = new CubeUpdate(cubeSeg.getCubeInstance()); - update.setToUpdateSegs(cubeSeg); - updateCube(update); - } - } - - /** - * return null if no dictionary for given column - */ - @SuppressWarnings("unchecked") - public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) { - DictionaryInfo info = null; - try { - DictionaryManager dictMgr = getDictionaryManager(); - String dictResPath = cubeSeg.getDictResPath(col); - if (dictResPath == null) - return null; - - info = dictMgr.getDictionaryInfo(dictResPath); - if (info == null) - throw new IllegalStateException("No dictionary found by " + dictResPath - + ", invalid cube state; cube segment" + cubeSeg + ", col " + col); - } catch (IOException e) { - throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col, e); - } - return (Dictionary<String>) info.getDictionaryObject(); - } - - public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException { - TableMetadataManager metaMgr = getTableManager(); - SnapshotManager snapshotMgr = getSnapshotManager(); - - TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, cubeSeg.getProject())); - IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); - SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc); - - cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath()); - CubeUpdate cubeBuilder = new CubeUpdate(cubeSeg.getCubeInstance()); - cubeBuilder.setToUpdateSegs(cubeSeg); - updateCube(cubeBuilder); - - return snapshot; - } - - // sync on update - public CubeInstance dropCube(String cubeName, boolean deleteDesc) throws IOException { - logger.info("Dropping cube '" + cubeName + "'"); - // load projects before remove cube from project - - // delete cube instance and cube desc - CubeInstance cube = getCube(cubeName); - - // remove cube and update cache - getStore().deleteResource(cube.getResourcePath()); - cubeMap.remove(cube.getName()); - Cuboid.clearCache(cube); - - if (deleteDesc && cube.getDescriptor() != null) { - CubeDescManager.getInstance(config).removeCubeDesc(cube.getDescriptor()); - } - - // delete cube from project - ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName); - - return cube; - } - - // sync on update public CubeInstance createCube(String cubeName, String projectName, CubeDesc desc, String owner) throws IOException { - logger.info("Creating cube '" + projectName + "-->" + cubeName + "' from desc '" + desc.getName() + "'"); + try (AutoLock lock = cubeMapLock.lockForWrite()) { + logger.info("Creating cube '" + projectName + "-->" + cubeName + "' from desc '" + desc.getName() + "'"); - // save cube resource - CubeInstance cube = CubeInstance.create(cubeName, desc); - cube.setOwner(owner); - updateCubeWithRetry(new CubeUpdate(cube), 0); + // save cube resource + CubeInstance cube = CubeInstance.create(cubeName, desc); + cube.setOwner(owner); + updateCubeWithRetry(new CubeUpdate(cube), 0); - ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName, owner); + ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName, + owner); - return cube; + return cube; + } } public CubeInstance createCube(CubeInstance cube, String projectName, String owner) throws IOException { - logger.info("Creating cube '" + projectName + "-->" + cube.getName() + "' from instance object. '"); + try (AutoLock lock = cubeMapLock.lockForWrite()) { + logger.info("Creating cube '" + projectName + "-->" + cube.getName() + "' from instance object. '"); - // save cube resource - cube.setOwner(owner); - updateCubeWithRetry(new CubeUpdate(cube), 0); + // save cube resource + cube.setOwner(owner); + updateCubeWithRetry(new CubeUpdate(cube), 0); - ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cube.getName(), projectName, - owner); + ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cube.getName(), + projectName, owner); - return cube; + return cube; + } } public CubeInstance updateCube(CubeUpdate update) throws IOException { - CubeInstance cube = updateCubeWithRetry(update, 0); - return cube; + try (AutoLock lock = cubeMapLock.lockForWrite()) { + CubeInstance cube = updateCubeWithRetry(update, 0); + return cube; + } } private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException { @@ -378,7 +310,7 @@ public class CubeManager implements IRealizationProvider { } try { - getStore().putResource(cube.getResourcePath(), cube, CUBE_SERIALIZER); + crud.save(cube); } catch (IllegalStateException ise) { logger.warn("Write conflict to update cube " + cube.getName() + " at try " + retry + ", will retry..."); if (retry >= 7) { @@ -386,7 +318,7 @@ public class CubeManager implements IRealizationProvider { throw ise; } - cube = reloadCubeLocal(cube.getName()); + cube = crud.reload(cube.getName()); update.setCubeInstance(cube); retry++; cube = updateCubeWithRetry(update, retry); @@ -402,486 +334,607 @@ public class CubeManager implements IRealizationProvider { } } - cubeMap.put(cube.getName(), cube); - //this is a duplicate call to take care of scenarios where REST cache service unavailable ProjectManager.getInstance(cube.getConfig()).clearL2Cache(); return cube; } - // append a full build segment - public CubeSegment appendSegment(CubeInstance cube) throws IOException { - return appendSegment(cube, null, null, null, null); + public CubeInstance reloadCubeQuietly(String cubeName) { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + CubeInstance cube = crud.reloadQuietly(cubeName); + if (cube != null) + Cuboid.clearCache(cube); + return cube; + } } - public CubeSegment appendSegment(CubeInstance cube, TSRange tsRange) throws IOException { - return appendSegment(cube, tsRange, null, null, null); + public void removeCubeLocal(String cubeName) { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + CubeInstance cube = cubeMap.get(cubeName); + if (cube != null) { + cubeMap.removeLocal(cubeName); + for (CubeSegment segment : cube.getSegments()) { + usedStorageLocation.remove(segment.getUuid()); + } + Cuboid.clearCache(cube); + } + } } - public CubeSegment appendSegment(CubeInstance cube, SourcePartition src) throws IOException { - return appendSegment(cube, src.getTSRange(), src.getSegRange(), src.getSourcePartitionOffsetStart(), - src.getSourcePartitionOffsetEnd()); - } + public CubeInstance dropCube(String cubeName, boolean deleteDesc) throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + logger.info("Dropping cube '" + cubeName + "'"); + // load projects before remove cube from project - CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange, - Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) - throws IOException { - checkInputRanges(tsRange, segRange); - checkBuildingSegment(cube); + // delete cube instance and cube desc + CubeInstance cube = getCube(cubeName); + + // remove cube and update cache + crud.delete(cube); + Cuboid.clearCache(cube); - // fix start/end a bit - if (cube.getModel().getPartitionDesc().isPartitioned()) { - // if missing start, set it to where last time ends - CubeSegment last = cube.getLastSegment(); - if (last != null && !last.isOffsetCube() && tsRange.start.v == 0) { - tsRange = new TSRange(last.getTSRange().end.v, tsRange.end.v); + if (deleteDesc && cube.getDescriptor() != null) { + CubeDescManager.getInstance(config).removeCubeDesc(cube.getDescriptor()); } - } else { - // full build - tsRange = null; - segRange = null; - } - CubeSegment newSegment = newSegment(cube, tsRange, segRange); - newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart); - newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd); - validateNewSegments(cube, newSegment); + // delete cube from project + ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToAddSegs(newSegment); - updateCube(cubeBuilder); - return newSegment; + return cube; + } } - public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) throws IOException { - checkInputRanges(tsRange, segRange); - checkBuildingSegment(cube); - - if (cube.getModel().getPartitionDesc().isPartitioned() == false) { - // full build - tsRange = null; - segRange = null; - } + @VisibleForTesting + /*private*/ String generateStorageLocation() { + String namePrefix = config.getHBaseTableNamePrefix(); + String tableName = ""; + Random ran = new Random(); + do { + StringBuffer sb = new StringBuffer(); + sb.append(namePrefix); + for (int i = 0; i < HBASE_TABLE_LENGTH; i++) { + sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length()))); + } + tableName = sb.toString(); + } while (this.usedStorageLocation.containsValue(tableName)); + return tableName; + } - CubeSegment newSegment = newSegment(cube, tsRange, segRange); + private boolean isReady(CubeSegment seg) { + return seg.getStatus() == SegmentStatusEnum.READY; + } - Pair<Boolean, Boolean> pair = cube.getSegments().fitInSegments(newSegment); - if (pair.getFirst() == false || pair.getSecond() == false) - throw new IllegalArgumentException("The new refreshing segment " + newSegment - + " does not match any existing segment in cube " + cube); + private TableMetadataManager getTableManager() { + return TableMetadataManager.getInstance(config); + } - if (segRange != null) { - CubeSegment toRefreshSeg = null; - for (CubeSegment cubeSegment : cube.getSegments()) { - if (cubeSegment.getSegRange().equals(segRange)) { - toRefreshSeg = cubeSegment; - break; - } - } + private DictionaryManager getDictionaryManager() { + return DictionaryManager.getInstance(config); + } - if (toRefreshSeg == null) { - throw new IllegalArgumentException("For streaming cube, only one segment can be refreshed at one time"); - } + private SnapshotManager getSnapshotManager() { + return SnapshotManager.getInstance(config); + } - newSegment.setSourcePartitionOffsetStart(toRefreshSeg.getSourcePartitionOffsetStart()); - newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd()); - } + private ResourceStore getStore() { + return ResourceStore.getStore(this.config); + } - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToAddSegs(newSegment); - updateCube(cubeBuilder); + @Override + public RealizationType getRealizationType() { + return RealizationType.CUBE; + } - return newSegment; + @Override + public IRealization getRealization(String name) { + return getCube(name); } - public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force) - throws IOException { - if (cube.getSegments().isEmpty()) - throw new IllegalArgumentException("Cube " + cube + " has no segments"); - - checkInputRanges(tsRange, segRange); - checkBuildingSegment(cube); - checkCubeIsPartitioned(cube); - - if (cube.getSegments().getFirstSegment().isOffsetCube()) { - // offset cube, merge by date range? - if (segRange == null && tsRange != null) { - Pair<CubeSegment, CubeSegment> pair = cube.getSegments(SegmentStatusEnum.READY) - .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE); - if (pair == null) - throw new IllegalArgumentException("Find no segments to merge by " + tsRange + " for cube " + cube); - segRange = new SegmentRange(pair.getFirst().getSegRange().start, pair.getSecond().getSegRange().end); - } - tsRange = null; - Preconditions.checkArgument(segRange != null); - } else { - segRange = null; - Preconditions.checkArgument(tsRange != null); - } - - CubeSegment newSegment = newSegment(cube, tsRange, segRange); - - Segments<CubeSegment> mergingSegments = cube.getMergingSegments(newSegment); - if (mergingSegments.size() <= 1) - throw new IllegalArgumentException("Range " + newSegment.getSegRange() - + " must contain at least 2 segments, but there is " + mergingSegments.size()); - - CubeSegment first = mergingSegments.get(0); - CubeSegment last = mergingSegments.get(mergingSegments.size() - 1); - if (force == false) { - for (int i = 0; i < mergingSegments.size() - 1; i++) { - if (!mergingSegments.get(i).getSegRange().connects(mergingSegments.get(i + 1).getSegRange())) - throw new IllegalStateException("Merging segments must not have gaps between " - + mergingSegments.get(i) + " and " + mergingSegments.get(i + 1)); - } - } - if (first.isOffsetCube()) { - newSegment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end)); - newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart()); - newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd()); - newSegment.setTSRange(null); - } else { - newSegment.setTSRange(new TSRange(mergingSegments.getTSStart(), mergingSegments.getTSEnd())); - newSegment.setSegRange(null); - } - - if (force == false) { - List<String> emptySegment = Lists.newArrayList(); - for (CubeSegment seg : mergingSegments) { - if (seg.getSizeKB() == 0) { - emptySegment.add(seg.getName()); - } - } + // ============================================================================ + // Segment related methods + // ============================================================================ - if (emptySegment.size() > 0) { - throw new IllegalArgumentException( - "Empty cube segment found, couldn't merge unless 'forceMergeEmptySegment' set to true: " - + emptySegment); - } - } + // append a full build segment + public CubeSegment appendSegment(CubeInstance cube) throws IOException { + return appendSegment(cube, null, null, null, null); + } - validateNewSegments(cube, newSegment); + public CubeSegment appendSegment(CubeInstance cube, TSRange tsRange) throws IOException { + return appendSegment(cube, tsRange, null, null, null); + } - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToAddSegs(newSegment); - updateCube(cubeBuilder); + public CubeSegment appendSegment(CubeInstance cube, SourcePartition src) throws IOException { + return appendSegment(cube, src.getTSRange(), src.getSegRange(), src.getSourcePartitionOffsetStart(), + src.getSourcePartitionOffsetEnd()); + } - return newSegment; + CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange, + Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) + throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + return segAssist.appendSegment(cube, tsRange, segRange, sourcePartitionOffsetStart, + sourcePartitionOffsetEnd); + } } - private void checkInputRanges(TSRange tsRange, SegmentRange segRange) { - if (tsRange != null && segRange != null) { - throw new IllegalArgumentException( - "Build or refresh cube segment either by TSRange or by SegmentRange, not both."); + public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + return segAssist.refreshSegment(cube, tsRange, segRange); } } - private void checkBuildingSegment(CubeInstance cube) { - int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments(); - if (cube.getBuildingSegments().size() >= maxBuldingSeg) { - throw new IllegalStateException( - "There is already " + cube.getBuildingSegments().size() + " building segment; "); + public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force) + throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + return segAssist.mergeSegments(cube, tsRange, segRange, force); } } - private void checkCubeIsPartitioned(CubeInstance cube) { - if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == false) { - throw new IllegalStateException( - "there is no partition date column specified, only full build is supported"); + public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + segAssist.promoteNewlyBuiltSegments(cube, newSegment); } } - /** - * After cube update, reload cube related cache - * - * @param cubeName - */ - public CubeInstance reloadCubeLocal(String cubeName) { - CubeInstance cubeInstance = reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName)); - if (cubeInstance != null) - Cuboid.clearCache(cubeInstance); - return cubeInstance; + public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) { + segAssist.validateNewSegments(cube, newSegments); } - public void removeCubeLocal(String cubeName) { - CubeInstance cube = cubeMap.get(cubeName); - if (cube != null) { - cubeMap.removeLocal(cubeName); - for (CubeSegment segment : cube.getSegments()) { - usedStorageLocation.remove(segment.getUuid()); - } - Cuboid.clearCache(cube); - } + public List<CubeSegment> calculateHoles(String cubeName) { + return segAssist.calculateHoles(cubeName); } - public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) { + private class SegmentAssist { + + CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange, + Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) + throws IOException { + checkInputRanges(tsRange, segRange); + checkBuildingSegment(cube); + + // fix start/end a bit + if (cube.getModel().getPartitionDesc().isPartitioned()) { + // if missing start, set it to where last time ends + CubeSegment last = cube.getLastSegment(); + if (last != null && !last.isOffsetCube() && tsRange.start.v == 0) { + tsRange = new TSRange(last.getTSRange().end.v, tsRange.end.v); + } + } else { + // full build + tsRange = null; + segRange = null; + } - String tableName = join.getPKSide().getTableIdentity(); - String[] pkCols = join.getPrimaryKey(); - String snapshotResPath = cubeSegment.getSnapshotResPath(tableName); - if (snapshotResPath == null) - throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment" - + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment); + CubeSegment newSegment = newSegment(cube, tsRange, segRange); + newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart); + newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd); + validateNewSegments(cube, newSegment); - try { - SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath); - TableDesc tableDesc = getTableManager().getTableDesc(tableName, cubeSegment.getProject()); - return new LookupStringTable(tableDesc, pkCols, snapshot); - } catch (IOException e) { - throw new IllegalStateException( - "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e); + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToAddSegs(newSegment); + updateCube(cubeBuilder); + return newSegment; } - } - private CubeSegment newSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) { - DataModelDesc modelDesc = cube.getModel(); + public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) + throws IOException { + checkInputRanges(tsRange, segRange); + checkBuildingSegment(cube); - CubeSegment segment = new CubeSegment(); - segment.setUuid(UUID.randomUUID().toString()); - segment.setName(CubeSegment.makeSegmentName(tsRange, segRange, modelDesc)); - segment.setCreateTimeUTC(System.currentTimeMillis()); - segment.setCubeInstance(cube); + if (cube.getModel().getPartitionDesc().isPartitioned() == false) { + // full build + tsRange = null; + segRange = null; + } - // let full build range be backward compatible - if (tsRange == null && segRange == null) - tsRange = new TSRange(0L, Long.MAX_VALUE); + CubeSegment newSegment = newSegment(cube, tsRange, segRange); - segment.setTSRange(tsRange); - segment.setSegRange(segRange); - segment.setStatus(SegmentStatusEnum.NEW); - segment.setStorageLocationIdentifier(generateStorageLocation()); + Pair<Boolean, Boolean> pair = cube.getSegments().fitInSegments(newSegment); + if (pair.getFirst() == false || pair.getSecond() == false) + throw new IllegalArgumentException("The new refreshing segment " + newSegment + + " does not match any existing segment in cube " + cube); - segment.setCubeInstance(cube); + if (segRange != null) { + CubeSegment toRefreshSeg = null; + for (CubeSegment cubeSegment : cube.getSegments()) { + if (cubeSegment.getSegRange().equals(segRange)) { + toRefreshSeg = cubeSegment; + break; + } + } - segment.validate(); - return segment; - } + if (toRefreshSeg == null) { + throw new IllegalArgumentException( + "For streaming cube, only one segment can be refreshed at one time"); + } - @VisibleForTesting - /*private*/ String generateStorageLocation() { - String namePrefix = config.getHBaseTableNamePrefix(); - String tableName = ""; - Random ran = new Random(); - do { - StringBuffer sb = new StringBuffer(); - sb.append(namePrefix); - for (int i = 0; i < HBASE_TABLE_LENGTH; i++) { - sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length()))); + newSegment.setSourcePartitionOffsetStart(toRefreshSeg.getSourcePartitionOffsetStart()); + newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd()); } - tableName = sb.toString(); - } while (this.usedStorageLocation.containsValue(tableName)); - return tableName; - } - public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException { - if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier())) - throw new IllegalStateException( - "For cube " + cube + ", segment " + newSegment + " missing StorageLocationIdentifier"); + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToAddSegs(newSegment); + updateCube(cubeBuilder); - if (StringUtils.isBlank(newSegment.getLastBuildJobID())) - throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " missing LastBuildJobID"); - - if (isReady(newSegment) == true) { - logger.warn("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY"); + return newSegment; } - List<CubeSegment> tobe = cube.calculateToBeSegments(newSegment); + public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force) + throws IOException { + if (cube.getSegments().isEmpty()) + throw new IllegalArgumentException("Cube " + cube + " has no segments"); + + checkInputRanges(tsRange, segRange); + checkBuildingSegment(cube); + checkCubeIsPartitioned(cube); + + if (cube.getSegments().getFirstSegment().isOffsetCube()) { + // offset cube, merge by date range? + if (segRange == null && tsRange != null) { + Pair<CubeSegment, CubeSegment> pair = cube.getSegments(SegmentStatusEnum.READY) + .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE); + if (pair == null) + throw new IllegalArgumentException( + "Find no segments to merge by " + tsRange + " for cube " + cube); + segRange = new SegmentRange(pair.getFirst().getSegRange().start, + pair.getSecond().getSegRange().end); + } + tsRange = null; + Preconditions.checkArgument(segRange != null); + } else { + segRange = null; + Preconditions.checkArgument(tsRange != null); + } + + CubeSegment newSegment = newSegment(cube, tsRange, segRange); + + Segments<CubeSegment> mergingSegments = cube.getMergingSegments(newSegment); + if (mergingSegments.size() <= 1) + throw new IllegalArgumentException("Range " + newSegment.getSegRange() + + " must contain at least 2 segments, but there is " + mergingSegments.size()); + + CubeSegment first = mergingSegments.get(0); + CubeSegment last = mergingSegments.get(mergingSegments.size() - 1); + if (force == false) { + for (int i = 0; i < mergingSegments.size() - 1; i++) { + if (!mergingSegments.get(i).getSegRange().connects(mergingSegments.get(i + 1).getSegRange())) + throw new IllegalStateException("Merging segments must not have gaps between " + + mergingSegments.get(i) + " and " + mergingSegments.get(i + 1)); + } + } + if (first.isOffsetCube()) { + newSegment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end)); + newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart()); + newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd()); + newSegment.setTSRange(null); + } else { + newSegment.setTSRange(new TSRange(mergingSegments.getTSStart(), mergingSegments.getTSEnd())); + newSegment.setSegRange(null); + } + + if (force == false) { + List<String> emptySegment = Lists.newArrayList(); + for (CubeSegment seg : mergingSegments) { + if (seg.getSizeKB() == 0) { + emptySegment.add(seg.getName()); + } + } + + if (emptySegment.size() > 0) { + throw new IllegalArgumentException( + "Empty cube segment found, couldn't merge unless 'forceMergeEmptySegment' set to true: " + + emptySegment); + } + } - if (tobe.contains(newSegment) == false) - throw new IllegalStateException( - "For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe); + validateNewSegments(cube, newSegment); - newSegment.setStatus(SegmentStatusEnum.READY); + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToAddSegs(newSegment); + updateCube(cubeBuilder); - List<CubeSegment> toRemoveSegs = Lists.newArrayList(); - for (CubeSegment segment : cube.getSegments()) { - if (!tobe.contains(segment)) - toRemoveSegs.add(segment); + return newSegment; } - logger.info("Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs); + private void checkInputRanges(TSRange tsRange, SegmentRange segRange) { + if (tsRange != null && segRange != null) { + throw new IllegalArgumentException( + "Build or refresh cube segment either by TSRange or by SegmentRange, not both."); + } + } - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])) - .setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY); - updateCube(cubeBuilder); - } + private void checkBuildingSegment(CubeInstance cube) { + int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments(); + if (cube.getBuildingSegments().size() >= maxBuldingSeg) { + throw new IllegalStateException( + "There is already " + cube.getBuildingSegments().size() + " building segment; "); + } + } - public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) { - List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments); - List<CubeSegment> newList = Arrays.asList(newSegments); - if (tobe.containsAll(newList) == false) { - throw new IllegalStateException("For cube " + cube + ", the new segments " + newList - + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe); + private void checkCubeIsPartitioned(CubeInstance cube) { + if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == false) { + throw new IllegalStateException( + "there is no partition date column specified, only full build is supported"); + } } - } - private boolean isReady(CubeSegment seg) { - return seg.getStatus() == SegmentStatusEnum.READY; - } + private CubeSegment newSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) { + DataModelDesc modelDesc = cube.getModel(); - private void loadAllCubeInstance() throws IOException { - ResourceStore store = getStore(); - List<String> paths = store.collectResourceRecursively(ResourceStore.CUBE_RESOURCE_ROOT, ".json"); + CubeSegment segment = new CubeSegment(); + segment.setUuid(UUID.randomUUID().toString()); + segment.setName(CubeSegment.makeSegmentName(tsRange, segRange, modelDesc)); + segment.setCreateTimeUTC(System.currentTimeMillis()); + segment.setCubeInstance(cube); - logger.info("Loading Cube from folder " + store.getReadableResourcePath(ResourceStore.CUBE_RESOURCE_ROOT)); + // let full build range be backward compatible + if (tsRange == null && segRange == null) + tsRange = new TSRange(0L, Long.MAX_VALUE); - int succeed = 0; - int fail = 0; - for (String path : paths) { - CubeInstance cube = reloadCubeLocalAt(path); - if (cube == null) { - fail++; - } else { - succeed++; - } + segment.setTSRange(tsRange); + segment.setSegRange(segRange); + segment.setStatus(SegmentStatusEnum.NEW); + segment.setStorageLocationIdentifier(generateStorageLocation()); + + segment.setCubeInstance(cube); + + segment.validate(); + return segment; } - logger.info("Loaded " + succeed + " cubes, fail on " + fail + " cubes"); - } + public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException { + if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier())) + throw new IllegalStateException( + "For cube " + cube + ", segment " + newSegment + " missing StorageLocationIdentifier"); - private CubeInstance reloadCubeLocalAt(String path) { - ResourceStore store = getStore(); - CubeInstance cube; + if (StringUtils.isBlank(newSegment.getLastBuildJobID())) + throw new IllegalStateException( + "For cube " + cube + ", segment " + newSegment + " missing LastBuildJobID"); - try { - cube = store.getResource(path, CubeInstance.class, CUBE_SERIALIZER); - if (cube == null) { - return cube; + if (isReady(newSegment) == true) { + logger.warn("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY"); } - String cubeName = cube.getName(); - checkState(StringUtils.isNotBlank(cubeName), "cube (at %s) name must not be blank", path); + List<CubeSegment> tobe = cube.calculateToBeSegments(newSegment); - CubeDesc cubeDesc = CubeDescManager.getInstance(config).getCubeDesc(cube.getDescName()); - checkNotNull(cubeDesc, "cube descriptor '%s' (for cube '%s') not found", cube.getDescName(), cubeName); + if (tobe.contains(newSegment) == false) + throw new IllegalStateException( + "For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe); - if (!cubeDesc.getError().isEmpty()) { - cube.setStatus(RealizationStatusEnum.DESCBROKEN); - logger.error("cube descriptor {} (for cube '{}') is broken", cubeDesc.getResourcePath(), cubeName); - for (String error : cubeDesc.getError()) { - logger.error("Error: {}", error); - } - } else if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) { - cube.setStatus(RealizationStatusEnum.DISABLED); - logger.info("cube {} changed from DESCBROKEN to DISABLED", cubeName); + newSegment.setStatus(SegmentStatusEnum.READY); + + List<CubeSegment> toRemoveSegs = Lists.newArrayList(); + for (CubeSegment segment : cube.getSegments()) { + if (!tobe.contains(segment)) + toRemoveSegs.add(segment); } - cube.setConfig((KylinConfigExt) cubeDesc.getConfig()); - cubeMap.putLocal(cubeName, cube); + logger.info( + "Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs); - for (CubeSegment segment : cube.getSegments()) { - usedStorageLocation.put(segment.getUuid(), segment.getStorageLocationIdentifier()); + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])) + .setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY); + updateCube(cubeBuilder); + } + + public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) { + List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments); + List<CubeSegment> newList = Arrays.asList(newSegments); + if (tobe.containsAll(newList) == false) { + throw new IllegalStateException("For cube " + cube + ", the new segments " + newList + + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe); } + } - logger.info("Reloaded cube {} being {} having {} segments", cubeName, cube, cube.getSegments().size()); - return cube; + /** + * Calculate the holes (gaps) in segments. + * @param cubeName + * @return + */ + public List<CubeSegment> calculateHoles(String cubeName) { + List<CubeSegment> holes = Lists.newArrayList(); + final CubeInstance cube = getCube(cubeName); + DataModelDesc modelDesc = cube.getModel(); + Preconditions.checkNotNull(cube); + final List<CubeSegment> segments = cube.getSegments(); + logger.info("totally " + segments.size() + " cubeSegments"); + if (segments.size() == 0) { + return holes; + } - } catch (Exception e) { - logger.error("Error during load cube instance, skipping : " + path, e); - return null; + Collections.sort(segments); + for (int i = 0; i < segments.size() - 1; ++i) { + CubeSegment first = segments.get(i); + CubeSegment second = segments.get(i + 1); + if (first.getSegRange().connects(second.getSegRange())) + continue; + + if (first.getSegRange().apartBefore(second.getSegRange())) { + CubeSegment hole = new CubeSegment(); + hole.setCubeInstance(cube); + if (first.isOffsetCube()) { + hole.setSegRange(new SegmentRange(first.getSegRange().end, second.getSegRange().start)); + hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd()); + hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart()); + hole.setName(CubeSegment.makeSegmentName(null, hole.getSegRange(), modelDesc)); + } else { + hole.setTSRange(new TSRange(first.getTSRange().end.v, second.getTSRange().start.v)); + hole.setName(CubeSegment.makeSegmentName(hole.getTSRange(), null, modelDesc)); + } + holes.add(hole); + } + } + return holes; } + } - private TableMetadataManager getTableManager() { - return TableMetadataManager.getInstance(config); + // ============================================================================ + // Dictionary/Snapshot related methods + // ============================================================================ + + public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable) + throws IOException { + return dictAssist.buildDictionary(cubeSeg, col, inpTable); } - private DictionaryManager getDictionaryManager() { - return DictionaryManager.getInstance(config); + public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable, + Dictionary<String> dict) throws IOException { + return dictAssist.saveDictionary(cubeSeg, col, inpTable, dict); } - private SnapshotManager getSnapshotManager() { - return SnapshotManager.getInstance(config); + /** + * return null if no dictionary for given column + */ + public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) { + return dictAssist.getDictionary(cubeSeg, col); } - private ResourceStore getStore() { - return ResourceStore.getStore(this.config); + public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException { + return dictAssist.buildSnapshotTable(cubeSeg, lookupTable); } - @Override - public RealizationType getRealizationType() { - return RealizationType.CUBE; + public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) { + return dictAssist.getLookupTable(cubeSegment, join); } - @Override - public IRealization getRealization(String name) { - return getCube(name); + //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns + public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException { + return dictAssist.getUHCIndex(cubeDesc); } - /** - * Calculate the holes (gaps) in segments. - * @param cubeName - * @return - */ - public List<CubeSegment> calculateHoles(String cubeName) { - List<CubeSegment> holes = Lists.newArrayList(); - final CubeInstance cube = getCube(cubeName); - DataModelDesc modelDesc = cube.getModel(); - Preconditions.checkNotNull(cube); - final List<CubeSegment> segments = cube.getSegments(); - logger.info("totally " + segments.size() + " cubeSegments"); - if (segments.size() == 0) { - return holes; + private class DictionaryAssist { + public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable) + throws IOException { + CubeDesc cubeDesc = cubeSeg.getCubeDesc(); + if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) + return null; + + String builderClass = cubeDesc.getDictionaryBuilderClass(col); + DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass); + + saveDictionaryInfo(cubeSeg, col, dictInfo); + return dictInfo; } - Collections.sort(segments); - for (int i = 0; i < segments.size() - 1; ++i) { - CubeSegment first = segments.get(i); - CubeSegment second = segments.get(i + 1); - if (first.getSegRange().connects(second.getSegRange())) - continue; - - if (first.getSegRange().apartBefore(second.getSegRange())) { - CubeSegment hole = new CubeSegment(); - hole.setCubeInstance(cube); - if (first.isOffsetCube()) { - hole.setSegRange(new SegmentRange(first.getSegRange().end, second.getSegRange().start)); - hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd()); - hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart()); - hole.setName(CubeSegment.makeSegmentName(null, hole.getSegRange(), modelDesc)); - } else { - hole.setTSRange(new TSRange(first.getTSRange().end.v, second.getTSRange().start.v)); - hole.setName(CubeSegment.makeSegmentName(hole.getTSRange(), null, modelDesc)); - } - holes.add(hole); + public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable, + Dictionary<String> dict) throws IOException { + CubeDesc cubeDesc = cubeSeg.getCubeDesc(); + if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) + return null; + + DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict); + + saveDictionaryInfo(cubeSeg, col, dictInfo); + return dictInfo; + } + + private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo) + throws IOException { + if (dictInfo != null) { + Dictionary<?> dict = dictInfo.getDictionaryObject(); + cubeSeg.putDictResPath(col, dictInfo.getResourcePath()); + cubeSeg.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() }); + + CubeUpdate update = new CubeUpdate(cubeSeg.getCubeInstance()); + update.setToUpdateSegs(cubeSeg); + updateCube(update); } } - return holes; - } - private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder"; + /** + * return null if no dictionary for given column + */ + @SuppressWarnings("unchecked") + public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) { + DictionaryInfo info = null; + try { + DictionaryManager dictMgr = getDictionaryManager(); + String dictResPath = cubeSeg.getDictResPath(col); + if (dictResPath == null) + return null; + + info = dictMgr.getDictionaryInfo(dictResPath); + if (info == null) + throw new IllegalStateException("No dictionary found by " + dictResPath + + ", invalid cube state; cube segment" + cubeSeg + ", col " + col); + } catch (IOException e) { + throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col, + e); + } + return (Dictionary<String>) info.getDictionaryObject(); + } - //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns - public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException { - List<TblColRef> dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt()); - int[] uhcIndex = new int[dictCols.size()]; - - //add GlobalDictionaryColumns - List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries(); - if (dictionaryDescList != null) { - for (DictionaryDesc dictionaryDesc : dictionaryDescList) { - if (dictionaryDesc.getBuilderClass() != null - && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) { - for (int i = 0; i < dictCols.size(); i++) { - if (dictCols.get(i).equals(dictionaryDesc.getColumnRef())) { - uhcIndex[i] = 1; - break; + public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException { + TableMetadataManager metaMgr = getTableManager(); + SnapshotManager snapshotMgr = getSnapshotManager(); + + TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, cubeSeg.getProject())); + IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); + SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc); + + cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath()); + CubeUpdate cubeBuilder = new CubeUpdate(cubeSeg.getCubeInstance()); + cubeBuilder.setToUpdateSegs(cubeSeg); + updateCube(cubeBuilder); + + return snapshot; + } + + public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) { + + String tableName = join.getPKSide().getTableIdentity(); + String[] pkCols = join.getPrimaryKey(); + String snapshotResPath = cubeSegment.getSnapshotResPath(tableName); + if (snapshotResPath == null) + throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment" + + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment); + + try { + SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath); + TableDesc tableDesc = getTableManager().getTableDesc(tableName, cubeSegment.getProject()); + return new LookupStringTable(tableDesc, pkCols, snapshot); + } catch (IOException e) { + throw new IllegalStateException( + "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e); + } + } + + private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder"; + + //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns + public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException { + List<TblColRef> dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt()); + int[] uhcIndex = new int[dictCols.size()]; + + //add GlobalDictionaryColumns + List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries(); + if (dictionaryDescList != null) { + for (DictionaryDesc dictionaryDesc : dictionaryDescList) { + if (dictionaryDesc.getBuilderClass() != null + && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) { + for (int i = 0; i < dictCols.size(); i++) { + if (dictCols.get(i).equals(dictionaryDesc.getColumnRef())) { + uhcIndex[i] = 1; + break; + } } } } } - } - //add ShardByColumns - Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns(); - for (int i = 0; i < dictCols.size(); i++) { - if (shardByColumns.contains(dictCols.get(i))) { - uhcIndex[i] = 1; + //add ShardByColumns + Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns(); + for (int i = 0; i < dictCols.size(); i++) { + if (shardByColumns.contains(dictCols.get(i))) { + uhcIndex[i] = 1; + } } - } - return uhcIndex; + return uhcIndex; + } } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java index d07c93b..2eaebb1 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java @@ -97,7 +97,7 @@ public class CubeSignatureRefresher { String calculatedSign = cubeDesc.calculateSignature(); if (cubeDesc.getSignature() == null || (!cubeDesc.getSignature().equals(calculatedSign))) { cubeDesc.setSignature(calculatedSign); - store.putResource(cubeDesc.getResourcePath(), cubeDesc, CubeDescManager.CUBE_DESC_SERIALIZER); + store.putResource(cubeDesc.getResourcePath(), cubeDesc, CubeDesc.newSerializerForLowLevelAccess()); updatedResources.add(cubeDesc.getResourcePath()); } } catch (Exception e) {