This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 4a484f861550f07417e5890bc8478f182f922052 Author: wangxiaojing <wangxiaoj...@didichuxing.com> AuthorDate: Mon Jan 13 19:39:31 2020 +0800 KYLIN-4336 Global domain dict for MR build engine --- .../java/org/apache/kylin/cube/CubeManager.java | 15 +- .../java/org/apache/kylin/cube/model/CubeDesc.java | 26 ++++ .../CubeDescTiretreeGlobalDomainDictUtil.java | 164 +++++++++++++++++++++ .../apache/kylin/cube/model/DictionaryDesc.java | 42 ++++++ .../cube/model/validation/rule/DictionaryRule.java | 12 +- .../kylin/measure/bitmap/BitmapMeasureType.java | 8 +- .../apache/kylin/metadata/model/DataModelDesc.java | 20 ++- .../kylin/engine/mr/common/AbstractHadoopJob.java | 4 + 8 files changed, 278 insertions(+), 13 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index d057982..7a44f60 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -45,6 +46,7 @@ import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeDescTiretreeGlobalDomainDictUtil; import org.apache.kylin.cube.model.DimensionDesc; import org.apache.kylin.cube.model.SnapshotTableDesc; import org.apache.kylin.dict.DictionaryInfo; @@ -1185,9 +1187,20 @@ public class CubeManager implements IRealizationProvider { @SuppressWarnings("unchecked") public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) { DictionaryInfo info = null; + String dictResPath = null; try { DictionaryManager dictMgr = getDictionaryManager(); - String dictResPath = cubeSeg.getDictResPath(col); + + //tiretree global domain dic + List<CubeDescTiretreeGlobalDomainDictUtil.GlobalDict> globalDicts = cubeSeg.getCubeDesc().listDomainDict(); + if (!globalDicts.isEmpty()) { + dictResPath = CubeDescTiretreeGlobalDomainDictUtil.globalReuseDictPath(cubeSeg.getConfig(), col, cubeSeg.getCubeDesc()); + } + + if (Objects.isNull(dictResPath)){ + dictResPath = cubeSeg.getDictResPath(col); + } + if (dictResPath == null) return null; diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index f5bb427..7c16d33 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; @@ -1375,6 +1376,13 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { result.remove(dictDesc.getColumnRef()); result.add(dictDesc.getResuseColumnRef()); } + + //tiretree global domain dic + if (Objects.isNull(dictDesc.getResuseColumnRef()) && Objects.nonNull(dictDesc.getReuseColumn())) { + logger.info("tiretree global domain dic : column {} use tiretree global domain dic, reuse column {} ", dictDesc.getColumnRef(), dictDesc.getReuseColumn()); + result.remove(dictDesc.getColumnRef()); + } + } } @@ -1382,6 +1390,24 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { } /** + * get tiretree global domain dic + * + * @return + */ + public List<CubeDescTiretreeGlobalDomainDictUtil.GlobalDict> listDomainDict() { + List<CubeDescTiretreeGlobalDomainDictUtil.GlobalDict> dicts = new ArrayList<>(); + if(dictionaries!=null && dictionaries.size()>0) { + for (DictionaryDesc dictionaryDesc : dictionaries) { + if (dictionaryDesc.isDomain()) { + dicts.add(new CubeDescTiretreeGlobalDomainDictUtil.GlobalDict(dictionaryDesc.getColumnRef(), dictionaryDesc.getReuseColumn(), dictionaryDesc.getCube(), dictionaryDesc.getModel())); + } + } + } + return dicts; + } + + + /** * A column may reuse dictionary of another column, find the dict column, return same col if there's no reuse column */ public TblColRef getDictionaryReuseColumn(TblColRef col) { diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDescTiretreeGlobalDomainDictUtil.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDescTiretreeGlobalDomainDictUtil.java new file mode 100644 index 0000000..999fcae --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDescTiretreeGlobalDomainDictUtil.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.model; + +import com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.DataModelManager; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableRef; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.source.SourceManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +public class CubeDescTiretreeGlobalDomainDictUtil { + private static final Logger logger = LoggerFactory.getLogger(CubeDescTiretreeGlobalDomainDictUtil.class); + + /** + * get reuse global tiretree global dic path + * @param tblColRef + * @param cubeDesc + * @return + */ + public static String globalReuseDictPath(KylinConfig config, TblColRef tblColRef, CubeDesc cubeDesc) { + String globalResumeDictPath = null; + List<GlobalDict> globalDicts = cubeDesc.listDomainDict(); + DataModelManager metadataManager = DataModelManager.getInstance(config); + CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + for (GlobalDict dict : globalDicts) { + if (dict.getSrc().getIdentity().equalsIgnoreCase(tblColRef.getIdentity())) { + String model = dict.getModel(); + String cube = dict.getCube(); + logger.info("cube:{} column:{} tiretree global domain dic reuse model:{} cube{} column:{} ", cubeDesc.getName() , tblColRef.getName(), model, cube, dict.getDesc()); + + DataModelDesc dataModel = metadataManager.getDataModelDesc(model); + if (Objects.isNull(dataModel)) { + logger.error("get cube:{} column:{} tiretree global domain dic reuse DataModelDesc error", cubeDesc.getName(), tblColRef.getName()); + return null; + } + + CubeInstance cubeInstance = cubeManager.getCube(cube); + CubeSegment cubeSegment = cubeInstance.getLatestReadySegment(); + + TblColRef colRef = dataModel.findColumn(dict.getDesc()); + if(Objects.isNull(colRef)){ + logger.error("get cube:{} column:{} tiretree global domain dic TblColRef error"); + return null; + } + + globalResumeDictPath = cubeSegment.getDictResPath(colRef); + + if (StringUtils.isBlank(globalResumeDictPath)) { + logger.error("get cube:{} column:{} tiretree global domain dic resume dict path error"); + } + logger.error("get cube:{} column:{} tiretree global domain dic resume dict path is {}", globalResumeDictPath); + break; + } + } + return globalResumeDictPath; + } + + + + + /** + * add resuce global tiretree global dic for baseid job + * @param cubeDesc + * @param dumpList + */ + public static void cuboidJob(CubeDesc cubeDesc, Set<String> dumpList) { + logger.info("cube {} start to add global domain dic", cubeDesc.getName()); + CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + DataModelManager metadataManager =DataModelManager.getInstance(KylinConfig.getInstanceFromEnv()); + + cubeManager.getCube(cubeDesc.getName()); + List<GlobalDict> globalDicts = cubeDesc.listDomainDict(); + + for (GlobalDict dict : globalDicts) { + String cube = dict.getCube(); + String model = dict.getModel(); + logger.debug("cube {} column {} start to add global domain dic ,reuse {}.{}.{}", cubeDesc.getName(), dict.getSrc(), model, cube, dict.getDesc()); + CubeInstance instance = cubeManager.getCube(cube); + logger.debug("cube {} column {} start to add global domain dic ,reuse cube{} dict", cubeDesc.getName(), dict.getSrc(), instance.getName()); + + // cube, model_desc, cube_desc, table + dumpList.add(instance.getResourcePath()); + dumpList.add(instance.getDescriptor().getModel().getResourcePath()); + dumpList.add(instance.getDescriptor().getResourcePath()); + dumpList.add(instance.getProjectInstance().getResourcePath()); + + for (TableRef tableRef : instance.getDescriptor().getModel().getAllTables()) { + TableDesc table = tableRef.getTableDesc(); + dumpList.add(table.getResourcePath()); + dumpList.addAll(SourceManager.getMRDependentResources(table)); + } + + DataModelDesc dataModelDesc = metadataManager.getDataModelDesc(model); + logger.debug("cube {} column {} start to add global domain dic ,reuse model{} dict", cubeDesc.getName(), dict.getSrc(), dataModelDesc.getName()); + TblColRef tblColRef = dataModelDesc.findColumn(dict.getDesc()); + CubeSegment segment = instance.getLatestReadySegment(); + logger.debug("cube {} column {} start to add global domain dic ,reuse mode:{} cube:{} segment:{} dict,tblColRef:{}", cubeDesc.getName(), dict.getSrc(), dataModelDesc.getName(), cube, segment.getName(), tblColRef.getIdentity()); + if(segment.getDictResPath(tblColRef)!=null) { + dumpList.addAll(ImmutableList.of(segment.getDictResPath(tblColRef))); + } + } + } + + + public static class GlobalDict implements Serializable { + private TblColRef src; + private String desc; + private String cube; + private String model; + + public GlobalDict(TblColRef src, String desc, String cube, String model) { + this.src = src; + this.desc = desc; + this.cube = cube; + this.model = model; + } + + public TblColRef getSrc() { + return src; + } + + public String getDesc() { + return desc; + } + + public String getCube() { + return cube; + } + + public String getModel() { + return model; + } + } +} diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java index a700e10..2d1ba99 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java @@ -19,6 +19,7 @@ package org.apache.kylin.cube.model; import java.util.Locale; +import java.util.Objects; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -40,6 +41,15 @@ public class DictionaryDesc implements java.io.Serializable { @JsonInclude(JsonInclude.Include.NON_NULL) private String builderClass; + //for tiretree global domain dic + @JsonProperty("cube") + private String cube; + + //for tiretree global domain dic + @JsonProperty("model") + private String model; + + // computed content private TblColRef colRef; private TblColRef reuseColRef; @@ -68,6 +78,38 @@ public class DictionaryDesc implements java.io.Serializable { return builderClass; } + public String getModel() { + return model; + } + + public void setModel(String model) { + this.model = model; + } + + public String getCube() { + return cube; + } + + public void setCube(String cube) { + this.cube = cube; + } + + public String getReuseColumn() { + return reuseColumn; + } + + /** + * check if the col is tiretree global domain dic + * @return + */ + public boolean isDomain() { + if (Objects.isNull(reuseColRef) && Objects.nonNull(reuseColumn)) { + return true; + } + return false; + } + + // for test public static DictionaryDesc create(String column, String reuseColumn, String builderClass) { DictionaryDesc desc = new DictionaryDesc(); diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java index df1316d..9023f28 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java @@ -32,6 +32,8 @@ import org.apache.kylin.cube.model.validation.ResultLevel; import org.apache.kylin.cube.model.validation.ValidateContext; import org.apache.kylin.dict.GlobalDictionaryBuilder; import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Validate Dictionary Settings: @@ -43,6 +45,8 @@ import org.apache.kylin.metadata.model.TblColRef; * </ul> */ public class DictionaryRule implements IValidatorRule<CubeDesc> { + private static final Logger logger = LoggerFactory.getLogger(DictionaryRule.class); + static final String ERROR_DUPLICATE_DICTIONARY_COLUMN = "Duplicated dictionary specification for column: "; static final String ERROR_REUSE_BUILDER_BOTH_SET = "REUSE and BUILDER both set on dictionary for column: "; static final String ERROR_REUSE_BUILDER_BOTH_EMPTY = "REUSE and BUILDER both empty on dictionary for column: "; @@ -80,8 +84,12 @@ public class DictionaryRule implements IValidatorRule<CubeDesc> { } if (reuseCol == null && StringUtils.isEmpty(builderClass)) { - context.addResult(ResultLevel.ERROR, ERROR_REUSE_BUILDER_BOTH_EMPTY + dictCol); - return; + if(dictDesc.isDomain()) { + logger.info("() is tiretree global domain dic", dictCol); + }else{ + context.addResult(ResultLevel.ERROR, ERROR_REUSE_BUILDER_BOTH_EMPTY + dictCol); + return; + } } if (StringUtils.isNotEmpty(builderClass) && builderClass.equalsIgnoreCase(GlobalDictionaryBuilder.class.getName()) && dimensionColumns.contains(dictCol) && rowKeyDesc.isUseDictionary(dictCol)) { diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java index 7134c6b..450a62a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java @@ -110,10 +110,10 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { } int id; - if (needDictionaryColumn(measureDesc.getFunction())) { - TblColRef literalCol = measureDesc.getFunction().getParameter().getColRefs().get(0); - Dictionary<String> dictionary = dictionaryMap.get(literalCol); - id = dictionary.getIdFromValue(values[0]); + TblColRef literalCol = measureDesc.getFunction().getParameter().getColRefs().get(0); + if (needDictionaryColumn(measureDesc.getFunction()) && dictionaryMap.containsKey(literalCol)) { + Dictionary<String> dictionary = dictionaryMap.get(literalCol); + id = dictionary.getIdFromValue(values[0]); } else { id = Integer.parseInt(values[0]); } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java index 818afdf..12dd63f 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.Set; @@ -285,10 +286,16 @@ public class DataModelDesc extends RootPersistentEntity { } public TblColRef findColumn(String table, String column) throws IllegalArgumentException { + TblColRef result = null; TableRef tableRef = findTable(table); - TblColRef result = tableRef.getColumn(column.toUpperCase(Locale.ROOT)); - if (result == null) - throw new IllegalArgumentException("Column not found by " + table + "." + column); + if (Objects.nonNull(tableRef)) { + result = tableRef.getColumn(column.toUpperCase(Locale.ROOT));; + } + + if (result == null) {//tiretree global domain dic + logger.warn("table {} column {} not found in its's model {} , maybe it's a tiretree global domain dict. ", table, column, getName() ); + } + return result; } @@ -310,8 +317,9 @@ public class DataModelDesc extends RootPersistentEntity { } } - if (result == null) - throw new IllegalArgumentException("Column not found by " + input); + if (result == null) { + logger.warn("Column {} not found in its's model {} , maybe it's a tiretree global domain dict. ", column, getName() ); + } return result; } @@ -320,7 +328,7 @@ public class DataModelDesc extends RootPersistentEntity { public TableRef findTable(String table) throws IllegalArgumentException { TableRef result = tableNameMap.get(table.toUpperCase(Locale.ROOT)); if (result == null) { - throw new IllegalArgumentException("Table not found by " + table); + logger.warn("table {} not found in its's model {} , maybe it's a tiretree global domain dict. ", table, getName() ); } return result; } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 5e49c76..fd4d413 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -69,6 +69,7 @@ import org.apache.kylin.common.util.StringSplitter; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDescTiretreeGlobalDomainDictUtil; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.metadata.model.TableDesc; @@ -586,6 +587,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { if (ifStatsIncluded) { dumpList.add(segment.getStatisticsResourcePath()); } + //tiretree global domain dic + CubeDescTiretreeGlobalDomainDictUtil.cuboidJob(segment.getCubeDesc(), dumpList); + dumpKylinPropsAndMetadata(segment.getProject(), dumpList, segment.getConfig(), conf); }