KYLIN-1476 Support measure dictionary in 1.x
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d517530c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d517530c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d517530c Branch: refs/heads/1.3.x Commit: d517530c93dc3176a52295af297afdfefdfc4b62 Parents: 438a6ad Author: wangxiaoyu <romansew...@gmail.com> Authored: Tue Mar 8 23:39:51 2016 +0800 Committer: Xiaoyu Wang <wangxia...@apache.org> Committed: Mon Mar 21 20:00:57 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeManager.java | 26 ++++++++++++- .../kylin/cube/cli/DictionaryGeneratorCLI.java | 14 +++---- .../kylin/job/cube/MergeDictionaryStep.java | 17 ++++----- .../hadoop/cube/FactDistinctColumnsMapper.java | 40 +++++--------------- .../hadoop/cube/FactDistinctColumnsReducer.java | 12 ++---- .../hadoop/hive/CubeJoinedFlatTableDesc.java | 18 +++++++-- 6 files changed, 65 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 88ce4f1..4615762 100644 --- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -194,11 +194,11 @@ public class CubeManager implements IRealizationProvider { public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, String factColumnsPath) throws IOException { CubeDesc cubeDesc = cubeSeg.getCubeDesc(); - if (!cubeDesc.getRowkey().isUseDictionary(col)) + if (!cubeDesc.getAllColumnsNeedDictionary().contains(col)) return null; DictionaryManager dictMgr = getDictionaryManager(); - DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, factColumnsPath); + DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), "true", col, factColumnsPath); if (dictInfo != null) { cubeSeg.putDictResPath(col, dictInfo.getResourcePath()); @@ -791,4 +791,26 @@ public class CubeManager implements IRealizationProvider { return getCube(name); } + + /** + * Get the columns which need build the dictionary from fact table. (the column exists on fact and is not fk) + * @param cubeDesc + * @return + * @throws IOException + */ + public List<TblColRef> getAllDictColumnsOnFact(CubeDesc cubeDesc) throws IOException { + List<TblColRef> dictionaryColumns = cubeDesc.getAllColumnsNeedDictionary(); + + List<TblColRef> factDictCols = new ArrayList<TblColRef>(); + DictionaryManager dictMgr = DictionaryManager.getInstance(config); + for (int i = 0; i < dictionaryColumns.size(); i++) { + TblColRef col = dictionaryColumns.get(i); + + String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), "true", col, null)[0]; + if (cubeDesc.getModel().isFactTable(scanTable)) { + factDictCols.add(col); + } + } + return factDictCols; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java ---------------------------------------------------------------------- diff --git a/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index 3cdaa93..bcbd496 100644 --- a/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -44,15 +44,13 @@ public class DictionaryGeneratorCLI { private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String factColumnsPath) throws IOException { CubeManager cubeMgr = CubeManager.getInstance(config); - for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) { - // dictionary - for (TblColRef col : dim.getColumnRefs()) { - if (cubeSeg.getCubeDesc().getRowkey().isUseDictionary(col)) { - logger.info("Building dictionary for " + col); - cubeMgr.buildDictionary(cubeSeg, col, factColumnsPath); - } - } + // dictionary + for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionary()) { + logger.info("Building dictionary for " + col); + cubeMgr.buildDictionary(cubeSeg, col, factColumnsPath); + } + for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) { // build snapshot if (dim.getTable() != null && !dim.getTable().equalsIgnoreCase(cubeSeg.getCubeDesc().getFactTable())) { // CubeSegment seg = cube.getTheOnlySegment(); http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java b/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java index 8f77558..47151a7 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java +++ b/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java @@ -107,16 +107,13 @@ public class MergeDictionaryStep extends AbstractExecutable { DictionaryManager dictMgr = DictionaryManager.getInstance(conf); CubeDesc cubeDesc = cube.getDescriptor(); - for (DimensionDesc dim : cubeDesc.getDimensions()) { - for (TblColRef col : dim.getColumnRefs()) { - if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) { - String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0]; - if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) { - colsNeedMeringDict.add(col); - } else { - colsNeedCopyDict.add(col); - } - } + + for (TblColRef col : cubeDesc.getAllColumnsNeedDictionary()) { + String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), "true", col, null)[0]; + if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) { + colsNeedMeringDict.add(col); + } else { + colsNeedCopyDict.add(col); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java index 72802aa..b70113d 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java @@ -19,7 +19,6 @@ package org.apache.kylin.job.hadoop.cube; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -34,10 +33,7 @@ import org.apache.kylin.common.mr.KylinMapper; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.RowKeyDesc; -import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.job.constant.BatchConstants; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc; @@ -51,7 +47,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRec private String cubeName; private CubeInstance cube; private CubeDesc cubeDesc; - private int[] factDictCols; + List<TblColRef> factDictCols; private CubeJoinedFlatTableDesc intermediateTableDesc; @@ -60,6 +56,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRec private int errorRecordCounter; private HCatSchema schema = null; + protected int[] dictionaryColumnIndex; @Override protected void setup(Context context) throws IOException { @@ -73,40 +70,23 @@ public class FactDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRec cubeDesc = cube.getDescriptor(); intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null); - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - List<TblColRef> columns = baseCuboid.getColumns(); - - ArrayList<Integer> factDictCols = new ArrayList<Integer>(); - RowKeyDesc rowkey = cubeDesc.getRowkey(); - DictionaryManager dictMgr = DictionaryManager.getInstance(config); - for (int i = 0; i < columns.size(); i++) { - TblColRef col = columns.get(i); - if (rowkey.isUseDictionary(col) == false) - continue; - - String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0]; - if (cubeDesc.getModel().isFactTable(scanTable)) { - factDictCols.add(i); - } + factDictCols = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc); + dictionaryColumnIndex = new int[factDictCols.size()]; + for (int i = 0; i < factDictCols.size(); i++) { + TblColRef colRef = factDictCols.get(i); + int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef); + dictionaryColumnIndex[i] = columnIndexOnFlatTbl; } - this.factDictCols = new int[factDictCols.size()]; - for (int i = 0; i < factDictCols.size(); i++) - this.factDictCols[i] = factDictCols.get(i); - schema = HCatInputFormat.getTableSchema(context.getConfiguration()); } @Override public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException { - try { - - int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes(); HCatFieldSchema fieldSchema = null; - for (int i : factDictCols) { + for (int i = 0; i < factDictCols.size(); i++) { outputKey.set((short) i); - fieldSchema = schema.get(flatTableIndexes[i]); + fieldSchema = schema.get(dictionaryColumnIndex[i]); Object fieldValue = record.get(fieldSchema.getName(), schema); if (fieldValue == null) continue; http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java index 89f90ba..809ced2 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java @@ -31,14 +31,12 @@ import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.job.constant.BatchConstants; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.apache.kylin.metadata.model.TblColRef; import java.io.IOException; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -47,8 +45,7 @@ import java.util.List; */ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> { - private List<TblColRef> columnList = new ArrayList<TblColRef>(); - + private List<TblColRef> factDictCols; @Override protected void setup(Context context) throws IOException { super.publishConfiguration(context.getConfiguration()); @@ -58,15 +55,12 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); CubeDesc cubeDesc = cube.getDescriptor(); - - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - columnList = baseCuboid.getColumns(); + factDictCols = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc); } @Override public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { - TblColRef col = columnList.get(key.get()); + TblColRef col = factDictCols.get(key.get()); HashSet<ByteArray> set = new HashSet<ByteArray>(); for (Text textValue : values) { http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java b/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java index b2bed9f..72dd400 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java @@ -48,9 +48,12 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { private List<IntermediateColumnDesc> columnList = Lists.newArrayList(); + private Map<String, Integer> columnIndexMap; + public CubeJoinedFlatTableDesc(CubeDesc cubeDesc, CubeSegment cubeSegment) { this.cubeDesc = cubeDesc; this.cubeSegment = cubeSegment; + this.columnIndexMap = Maps.newHashMap(); parseCubeDesc(); } @@ -73,10 +76,9 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { this.tableName = "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getName(); } - Map<String, Integer> dimensionIndexMap = Maps.newHashMap(); int columnIndex = 0; for (TblColRef col : cubeDesc.listDimensionColumnsExcludingDerived()) { - dimensionIndexMap.put(colName(col.getCanonicalName()), columnIndex); + columnIndexMap.put(colName(col.getCanonicalName()), columnIndex); columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), col)); columnIndex++; } @@ -86,7 +88,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { rowKeyColumnIndexes = new int[rowkeyColCount]; for (int i = 0; i < rowkeyColCount; i++) { String colName = colName(cuboidColumns.get(i).getCanonicalName()); - Integer dimIdx = dimensionIndexMap.get(colName); + Integer dimIdx = columnIndexMap.get(colName); if (dimIdx == null) { throw new RuntimeException("Can't find column " + colName); } @@ -108,6 +110,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { measureColumnIndexes[i][j] = contains(columnList, c); if (measureColumnIndexes[i][j] < 0) { measureColumnIndexes[i][j] = columnIndex; + columnIndexMap.put(colName(c.getCanonicalName()), columnIndex); columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), c)); columnIndex++; } @@ -172,4 +175,13 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { private static String colName(String canonicalColName) { return canonicalColName.replace(".", "_"); } + + public int getColumnIndex(TblColRef colRef) { + String key = colName(colRef.getCanonicalName()); + Integer index = columnIndexMap.get(key); + if (index == null) + throw new IllegalArgumentException("Column " + colRef.toString() + " wasn't found on flat table."); + + return index.intValue(); + } }