This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 3e5058155a32ab4f4867290227ef5ec4bc2c6ba9 Author: wangxiaojing <wangxiaoj...@didichuxing.com> AuthorDate: Wed May 6 14:43:08 2020 +0800 KYLIN-4366 Build Global Dict by MR/Hive, Merge to dict table Step implementation --- .../kylin/engine/mr/BatchCubingJobBuilder2.java | 5 +- .../java/org/apache/kylin/engine/mr/IInput.java | 3 + .../apache/kylin/source/hive/HiveInputBase.java | 82 +++++++++++++--------- .../apache/kylin/source/kafka/KafkaInputBase.java | 5 ++ 4 files changed, 62 insertions(+), 33 deletions(-) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index b62650a..8ec7d36 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -73,7 +73,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId)); } - //toDo merge global dic and replace flat table + //merge global dic and replace flat table + if(Objects.nonNull(dictConfig.getMrHiveDictColumns()) && dictConfig.getMrHiveDictColumns().length > 0 && !"".equals(dictConfig.getMrHiveDictColumns()[0])){ + inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result); + } // Phase 2: Build Dictionary result.addTask(createFactDistinctColumnsStep(jobId)); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java index 758b081..9fdb300 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java @@ -34,6 +34,9 @@ public interface IInput { /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow); + /** Add step that replace flat table global column value by global dic*/ + public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow); + /** Add step that does necessary clean up, like delete the intermediate flat table */ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow); } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java index 65b8dc6..8bad023 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java @@ -126,6 +126,30 @@ public class HiveInputBase { addStepPhase1_DoMaterializeLookupTable(jobFlow); } + @Override + public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow) { + KylinConfig dictConfig = (flatDesc.getSegment()).getConfig(); + final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); + String[] mrHiveDictColumnsExcludeRefCols = dictConfig.getMrHiveDictColumnsExcludeRefColumns(); + Map<String, String> dictRef = dictConfig.getMrHiveDictRefColumns(); + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); + + String globalDictDatabase = dictConfig.getMrHiveDictDB(); + if (null == globalDictDatabase) { + throw new IllegalArgumentException("Mr-Hive Global dict database is null."); + } + String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix(); + if(Objects.nonNull(mrHiveDictColumnsExcludeRefCols) && mrHiveDictColumnsExcludeRefCols.length > 0) { + //merge to dict table step + jobFlow.addTask(createHiveGlobalDictMergeGlobalDict(flatDesc, hiveInitStatements, cubeName, mrHiveDictColumnsExcludeRefCols, globalDictDatabase, globalDictTable)); + + for (String item : mrHiveDictColumnsExcludeRefCols) { + dictRef.put(item, ""); + } + } + //toDo add replace step + } + protected void addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow, String[] mrHiveDictColumns, String globalDictDatabase, String globalDictTable) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); @@ -180,49 +204,43 @@ public class HiveInputBase { return step; } - protected static AbstractExecutable createMrHiveGlobalDictReplaceStep(IJoinedFlatTableDesc flatDesc, - String hiveInitStatements, String hdfsWorkingDir, String cubeName, String[] mrHiveDictColumns, - String flatTableDatabase, String globalDictDatabase, String globalDictTable) { + protected static AbstractExecutable createHiveGlobalDictMergeGlobalDict(IJoinedFlatTableDesc flatDesc, + String hiveInitStatements, String cubeName, String[] mrHiveDictColumns, + String globalDictDatabase, String globalDictTable) { + + String globalDictItermediateTable = MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc); + + StringBuffer addPartition = new StringBuffer(); + Map<String, String> maxDictValMap = new HashMap<>(); Map<String, String> dictHqlMap = new HashMap<>(); for (String dictColumn : mrHiveDictColumns) { - StringBuilder dictHql = new StringBuilder(); - TblColRef dictColumnRef = null; - - String flatTable = flatTableDatabase + "." + flatDesc.getTableName(); - // replace the flat table's dict column value - dictHql.append("INSERT OVERWRITE TABLE " + flatTable + " \n"); try { - dictHql.append("SELECT \n"); - Integer flatTableColumnSize = flatDesc.getAllColumns().size(); - for (int i = 0; i < flatTableColumnSize; i++) { - TblColRef tblColRef = flatDesc.getAllColumns().get(i); - if (i > 0) { - dictHql.append(","); - } - if (JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) { - dictHql.append("b. dict_val \n"); - dictColumnRef = tblColRef; - } else { - dictHql.append("a." + JoinedFlatTable.colName(tblColRef) + " \n"); - } - } - dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n" - + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable - + " WHERE dict_column = '" + dictColumn + "' \n" + ") b \n" + " ON a." - + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;"); - dictHqlMap.put(dictColumn, dictHql.toString()); + addPartition.append("alter table ").append(globalDictItermediateTable) + .append(" add IF NOT EXISTS partition (dict_column='").append(dictColumn) + .append("');").append(" \n"); + + String dictHql = "INSERT OVERWRITE TABLE " + globalDictDatabase + "." + globalDictTable + " \n" + + "PARTITION (dict_column = '" + dictColumn + "') \n" + + "SELECT dict_key, dict_val FROM " + + globalDictDatabase + "." + globalDictTable + " \n" + "WHERE dict_column = '" + dictColumn + + "' \n" + flatDesc.getDataModel().getConfig().getHiveUnionStyle() + " \n" + + "SELECT dict_key, dict_val FROM " + + globalDictItermediateTable + " \n" + " WHERE dict_column = '" + dictColumn + "' ;\n"; + dictHqlMap.put(dictColumn, dictHql); } catch (Exception e) { logger.error("", e); } } + String hiveInitStatementForUnstrict = "set hive.mapred.mode=unstrict;"; CreateMrHiveDictStep step = new CreateMrHiveDictStep(); - step.setInitStatement(hiveInitStatements); + step.setInitStatement(hiveInitStatements + hiveInitStatementForUnstrict + addPartition); step.setCreateTableStatementMap(dictHqlMap); - step.setIsUnLock(true); + step.setMaxDictStatementMap(maxDictValMap); + step.setIsLock(false); + step.setIsUnLock(false); step.setLockPathName(cubeName); - CubingExecutableUtil.setCubeName(cubeName, step.getParams()); - step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL); + step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL); return step; } diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java index d9e112c..5ba9082 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java @@ -88,6 +88,11 @@ public class KafkaInputBase { } } + @Override + public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow) { + //do nothing + } + protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) { return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId()); }