This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5addc9da885a606ca1f236261df1c314bdb13e0f Author: wangxiaojing <wangxiaoj...@didichuxing.com> AuthorDate: Wed May 6 14:43:45 2020 +0800 KYLIN-4367 Build Global Dict by MR/Hive, Replace intermediate table Step implementation --- .../apache/kylin/source/hive/HiveInputBase.java | 69 +++++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java index 8bad023..624c8f9 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.Locale; import java.util.Collections; +import org.apache.kylin.shaded.com.google.common.base.Strings; import org.apache.kylin.shaded.com.google.common.collect.Lists; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -147,7 +148,13 @@ public class HiveInputBase { dictRef.put(item, ""); } } - //toDo add replace step + + //replace step + if(dictRef.size()>0) { + jobFlow.addTask(createMrHiveGlobalDictReplaceStep(flatDesc, hiveInitStatements, cubeName, + dictRef, flatTableDatabase, globalDictDatabase, globalDictTable, dictConfig.getMrHiveDictTableSuffix(), jobFlow.getId())); + } + } protected void addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow, @@ -244,6 +251,66 @@ public class HiveInputBase { return step; } + protected static AbstractExecutable createMrHiveGlobalDictReplaceStep(IJoinedFlatTableDesc flatDesc, String hiveInitStatements, String cubeName, Map<String, String> mrHiveDictColumns, String flatTableDatabase, String globalDictDatabase, String globalDictTable, String dictSuffix, String jobId) { + Map<String, String> dictHqlMap = new HashMap<>(); + StringBuilder addPartition = new StringBuilder(); + for (String dictColumn : mrHiveDictColumns.keySet()) { + StringBuilder dictHql = new StringBuilder(); + TblColRef dictColumnRef = null; + + String flatTable = flatTableDatabase + "." + flatDesc.getTableName(); + // replace the flat table's dict column value + dictHql.append("INSERT OVERWRITE TABLE " + flatTable + " \n"); + try { + dictHql.append("SELECT \n"); + Integer flatTableColumnSize = flatDesc.getAllColumns().size(); + for (int i = 0; i < flatTableColumnSize; i++) { + TblColRef tblColRef = flatDesc.getAllColumns().get(i); + if (i > 0) { + dictHql.append(","); + } + if (JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) { + dictHql.append("b.dict_val \n"); + dictColumnRef = tblColRef; + } else { + dictHql.append("a." + JoinedFlatTable.colName(tblColRef) + " \n"); + } + } + + if (!Strings.isNullOrEmpty(mrHiveDictColumns.get(dictColumn))) { + String[] cubePartion = mrHiveDictColumns.get(dictColumn).split("\\."); + + String refGlobalDictTable = cubePartion[0] + dictSuffix; + String refDictColumn = cubePartion[1]; + + dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n" + + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + refGlobalDictTable + + " WHERE dict_column = '" + refDictColumn + "' \n" + ") b \n" + " ON a." + + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;"); + dictHqlMap.put(dictColumn, dictHql.toString()); + }else { + dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n" + + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable + + " WHERE dict_column = '" + dictColumn + "' \n" + ") b \n" + " ON a." + + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;"); + } + dictHqlMap.put(dictColumn, dictHql.toString()); + } catch (Exception e) { + logger.error("", e); + } + } + String set = "set hive.exec.compress.output=false; set hive.mapred.mode=unstrict;"; + CreateMrHiveDictStep step = new CreateMrHiveDictStep(); + step.setInitStatement(hiveInitStatements + set + addPartition); + step.setCreateTableStatementMap(dictHqlMap); + step.setIsUnLock(true); + step.setLockPathName(cubeName); + //toDo Fix distributed concurrency lock bug + CubingExecutableUtil.setCubeName(cubeName, step.getParams()); + step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL); + return step; + } + protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);