This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 3e5058155a32ab4f4867290227ef5ec4bc2c6ba9
Author: wangxiaojing <wangxiaoj...@didichuxing.com>
AuthorDate: Wed May 6 14:43:08 2020 +0800

    KYLIN-4366 Build Global Dict by MR/Hive, Merge to dict table
     Step implementation
---
 .../kylin/engine/mr/BatchCubingJobBuilder2.java    |  5 +-
 .../java/org/apache/kylin/engine/mr/IInput.java    |  3 +
 .../apache/kylin/source/hive/HiveInputBase.java    | 82 +++++++++++++---------
 .../apache/kylin/source/kafka/KafkaInputBase.java  |  5 ++
 4 files changed, 62 insertions(+), 33 deletions(-)

diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index b62650a..8ec7d36 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -73,7 +73,10 @@ public class BatchCubingJobBuilder2 extends 
JobBuilderSupport {
             result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId));
         }
 
-        //toDo merge global dic and replace flat table
+        //merge global dic and replace flat table
+        if(Objects.nonNull(dictConfig.getMrHiveDictColumns()) && 
dictConfig.getMrHiveDictColumns().length > 0 && 
!"".equals(dictConfig.getMrHiveDictColumns()[0])){
+            inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
+        }
 
         // Phase 2: Build Dictionary
         result.addTask(createFactDistinctColumnsStep(jobId));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
index 758b081..9fdb300 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
@@ -34,6 +34,9 @@ public interface IInput {
         /** Add step that creates an intermediate flat table as defined by 
CubeJoinedFlatTableDesc */
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable 
jobFlow);
 
+        /** Add step that replace flat table global column value by global 
dic*/
+        public void 
addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable 
jobFlow);
+
         /** Add step that does necessary clean up, like delete the 
intermediate flat table */
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 65b8dc6..8bad023 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -126,6 +126,30 @@ public class HiveInputBase {
             addStepPhase1_DoMaterializeLookupTable(jobFlow);
         }
 
+        @Override
+        public void 
addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable 
jobFlow) {
+            KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
+            final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            String[] mrHiveDictColumnsExcludeRefCols = 
dictConfig.getMrHiveDictColumnsExcludeRefColumns();
+            Map<String, String> dictRef = dictConfig.getMrHiveDictRefColumns();
+            final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+
+            String globalDictDatabase = dictConfig.getMrHiveDictDB();
+            if (null == globalDictDatabase) {
+                throw new IllegalArgumentException("Mr-Hive Global dict 
database is null.");
+            }
+            String globalDictTable = cubeName + 
dictConfig.getMrHiveDictTableSuffix();
+            if(Objects.nonNull(mrHiveDictColumnsExcludeRefCols) && 
mrHiveDictColumnsExcludeRefCols.length > 0) {
+                //merge to dict table step
+                jobFlow.addTask(createHiveGlobalDictMergeGlobalDict(flatDesc, 
hiveInitStatements, cubeName, mrHiveDictColumnsExcludeRefCols, 
globalDictDatabase, globalDictTable));
+
+                for (String item : mrHiveDictColumnsExcludeRefCols) {
+                    dictRef.put(item, "");
+                }
+            }
+            //toDo add replace step
+        }
+
         protected void 
addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow,
                 String[] mrHiveDictColumns, String globalDictDatabase, String 
globalDictTable) {
             final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
@@ -180,49 +204,43 @@ public class HiveInputBase {
             return step;
         }
 
-        protected static AbstractExecutable 
createMrHiveGlobalDictReplaceStep(IJoinedFlatTableDesc flatDesc,
-                String hiveInitStatements, String hdfsWorkingDir, String 
cubeName, String[] mrHiveDictColumns,
-                String flatTableDatabase, String globalDictDatabase, String 
globalDictTable) {
+        protected static AbstractExecutable 
createHiveGlobalDictMergeGlobalDict(IJoinedFlatTableDesc flatDesc,
+                                                                               
 String hiveInitStatements, String cubeName, String[] mrHiveDictColumns,
+                                                                               
 String globalDictDatabase, String globalDictTable) {
+
+            String globalDictItermediateTable = 
MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc);
+
+            StringBuffer addPartition = new StringBuffer();
+            Map<String, String> maxDictValMap = new HashMap<>();
             Map<String, String> dictHqlMap = new HashMap<>();
             for (String dictColumn : mrHiveDictColumns) {
-                StringBuilder dictHql = new StringBuilder();
-                TblColRef dictColumnRef = null;
-
-                String flatTable = flatTableDatabase + "." + 
flatDesc.getTableName();
-                // replace the flat table's dict column value
-                dictHql.append("INSERT OVERWRITE TABLE " + flatTable + " \n");
                 try {
-                    dictHql.append("SELECT \n");
-                    Integer flatTableColumnSize = 
flatDesc.getAllColumns().size();
-                    for (int i = 0; i < flatTableColumnSize; i++) {
-                        TblColRef tblColRef = flatDesc.getAllColumns().get(i);
-                        if (i > 0) {
-                            dictHql.append(",");
-                        }
-                        if (JoinedFlatTable.colName(tblColRef, 
flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) {
-                            dictHql.append("b. dict_val \n");
-                            dictColumnRef = tblColRef;
-                        } else {
-                            dictHql.append("a." + 
JoinedFlatTable.colName(tblColRef) + " \n");
-                        }
-                    }
-                    dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER 
JOIN \n" + "( \n"
-                            + "SELECT dict_key, dict_val FROM " + 
globalDictDatabase + "." + globalDictTable
-                            + " WHERE dict_column = '" + dictColumn + "' \n" + 
") b \n" + " ON a."
-                            + JoinedFlatTable.colName(dictColumnRef) + " = 
b.dict_key;");
-                    dictHqlMap.put(dictColumn, dictHql.toString());
+                    addPartition.append("alter table 
").append(globalDictItermediateTable)
+                            .append(" add  IF NOT EXISTS partition 
(dict_column='").append(dictColumn)
+                            .append("');").append(" \n");
+
+                    String dictHql = "INSERT OVERWRITE TABLE " + 
globalDictDatabase + "." + globalDictTable + " \n"
+                            + "PARTITION (dict_column = '" + dictColumn + "') 
\n"
+                            + "SELECT dict_key, dict_val FROM "
+                            + globalDictDatabase + "." + globalDictTable + " 
\n" + "WHERE dict_column = '" + dictColumn
+                            + "' \n" + 
flatDesc.getDataModel().getConfig().getHiveUnionStyle() + " \n"
+                            + "SELECT dict_key, dict_val FROM "
+                            + globalDictItermediateTable + " \n" + " WHERE 
dict_column = '" + dictColumn + "' ;\n";
+                    dictHqlMap.put(dictColumn, dictHql);
                 } catch (Exception e) {
                     logger.error("", e);
                 }
             }
+            String hiveInitStatementForUnstrict = "set 
hive.mapred.mode=unstrict;";
             CreateMrHiveDictStep step = new CreateMrHiveDictStep();
-            step.setInitStatement(hiveInitStatements);
+            step.setInitStatement(hiveInitStatements + 
hiveInitStatementForUnstrict + addPartition);
             step.setCreateTableStatementMap(dictHqlMap);
-            step.setIsUnLock(true);
+            step.setMaxDictStatementMap(maxDictValMap);
+            step.setIsLock(false);
+            step.setIsUnLock(false);
             step.setLockPathName(cubeName);
-
             CubingExecutableUtil.setCubeName(cubeName, step.getParams());
-            
step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL);
+            
step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL);
             return step;
         }
 
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
index d9e112c..5ba9082 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
@@ -88,6 +88,11 @@ public class KafkaInputBase {
             }
         }
 
+        @Override
+        public void 
addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable 
jobFlow) {
+            //do nothing
+        }
+
         protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
             return 
JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), 
jobFlow.getId());
         }

Reply via email to