This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 85b78327d1869c021c6d47ed2eb57b21b2557b2c
Author: wangxiaojing <wangxiaoj...@didichuxing.com>
AuthorDate: Wed May 6 14:39:03 2020 +0800

    KYLIN-4345 Build Global Dict by MR/Hive, Parallel Part Build
     Step implementation
---
 .../kylin/job/constant/ExecutableConstants.java    |   1 +
 .../kylin/engine/mr/BatchCubingJobBuilder2.java    |  18 ++-
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  21 +++
 .../steps/BuildGlobalHiveDicPartBuildReducer.java  |  85 +++++++++++
 .../steps/BuildGlobalHiveDicPartPartitioner.java   |  73 ++++++++++
 .../mr/steps/BuildGlobalHiveDictPartBuildJob.java  | 156 +++++++++++++++++++++
 .../steps/BuildGlobalHiveDictPartBuildMapper.java  | 109 ++++++++++++++
 7 files changed, 462 insertions(+), 1 deletion(-)

diff --git 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 73c5ecc..b2d087b 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -86,6 +86,7 @@ public final class ExecutableConstants {
 
     // MR - Hive Dict
     public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL = 
"Build Global Dict - extract distinct value from data";
+    public static final String STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL = 
"Build Global Dict - parallel part build";
     public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL = 
"Build Global Dict - merge to dict table";
     public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = 
"Build Global Dict - replace intermediate table";
 
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index c566a13..0aae61d 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -19,7 +19,8 @@
 package org.apache.kylin.engine.mr;
 
 import java.util.List;
-
+import java.util.Objects;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidUtil;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
@@ -58,6 +59,21 @@ public class BatchCubingJobBuilder2 extends 
JobBuilderSupport {
         // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
         inputSide.addStepPhase1_CreateFlatTable(result);
 
+        // build global dict
+        KylinConfig dictConfig = seg.getConfig();
+        String[] mrHiveDictColumns = 
dictConfig.getMrHiveDictColumnsExcludeRefColumns();
+
+        if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0
+                && !"".equals(mrHiveDictColumns[0])) {
+
+            //parallel part build
+            result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
+
+            //toDo parallel total build
+        }
+
+        //toDo merge global dic and replace flat table
+
         // Phase 2: Build Dictionary
         result.addTask(createFactDistinctColumnsStep(jobId));
 
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 6c236a5..dddb67d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -36,6 +36,7 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDictPartBuildJob;
 import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob;
 import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -204,6 +205,22 @@ public class JobBuilderSupport {
         return result;
     }
 
+
+    public MapReduceExecutable createBuildGlobalHiveDictPartBuildJob(String 
jobId) {
+        MapReduceExecutable result = new MapReduceExecutable();
+        
result.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL);
+        result.setMapReduceJobClass(BuildGlobalHiveDictPartBuildJob.class);
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, 
seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, 
seg.getUuid());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                ExecutableConstants.STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL + 
seg.getRealization().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, 
getBuildGlobalDictionaryBasePath(jobId));
+        result.setMapReduceParams(cmd.toString());
+        return result;
+    }
+
     public UpdateCubeInfoAfterBuildStep 
createUpdateCubeInfoAfterBuildStep(String jobId, LookupMaterializeContext 
lookupMaterializeContext) {
         final UpdateCubeInfoAfterBuildStep result = new 
UpdateCubeInfoAfterBuildStep();
         result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
@@ -341,6 +358,10 @@ public class JobBuilderSupport {
         return getRealizationRootPath(jobId) + "/dictionary_shrunken";
     }
 
+    public String getBuildGlobalDictionaryBasePath(String jobId) {
+        return getRealizationRootPath(jobId) + "/global_dic";
+    }
+
     public String getDictRootPath(String jobId) {
         return getRealizationRootPath(jobId) + "/dict";
     }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java
new file mode 100644
index 0000000..7ecf16d
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BuildGlobalHiveDicPartBuildReducer extends KylinReducer<Text, 
LongWritable, LongWritable, Text> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BuildGlobalHiveDicPartBuildReducer.class);
+
+    private Long count=0L;
+    private MultipleOutputs mos;
+    private String[] dicCols;
+    private String colName;
+    private int colIndex;
+
+    @Override
+    protected void doSetup(Context context) throws IOException, 
InterruptedException {
+        mos = new MultipleOutputs(context);
+        KylinConfig config;
+        try {
+            config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        dicCols = config.getMrHiveDictColumnsExcludeRefColumns();
+    }
+
+    @Override
+    public void doReduce(Text key, Iterable<LongWritable> values, Context 
context)
+            throws IOException, InterruptedException {
+        count++;
+        byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
+        if(count==1){
+            colIndex = key.getBytes()[0];//col index
+            colName = dicCols[colIndex];
+        }
+        logAFewRows(key.toString());
+        mos.write(colIndex+"", new LongWritable(count), new Text(keyBytes), 
"part_sort/"+colIndex);
+    }
+
+    private void logAFewRows(String value) {
+        if(count<10){
+            logger.info("key:{}, temp dict num :{},colIndex:{},colName:{}", 
value, count, colIndex, colName);
+        }
+    }
+
+    @Override
+    protected void doCleanup(Context context) throws IOException, 
InterruptedException {
+        Configuration conf = context.getConfiguration();
+
+        String partition = conf.get(MRJobConfig.TASK_PARTITION);
+        mos.write(colIndex + "", new LongWritable(count), new Text(partition), 
"reduce_stats/" + colIndex);
+        mos.close();
+        logger.info("Reduce partition num {} finish,this reduce done item 
count is {}" , partition, count);
+    }
+
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java
new file mode 100644
index 0000000..97ad4f4
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+
+public class BuildGlobalHiveDicPartPartitioner extends Partitioner<Text, 
NullWritable> implements Configurable {
+    private Configuration conf;
+
+    private Integer[] reduceNumArr;
+
+    @Override
+    public void setConf(Configuration configuration) {
+        this.conf = configuration;
+
+        KylinConfig config;
+        try {
+            config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        reduceNumArr = config.getMrHiveDictColumnsReduceNumExcludeRefCols();
+    }
+
+    @Override
+    public int getPartition(Text key, NullWritable value, int numReduceTasks) {
+        //get first byte, the first byte value is the dic col index ,start 
from 0
+        int colIndex = key.getBytes()[0];
+        int colReduceNum = reduceNumArr[colIndex];
+
+        int colReduceNumOffset = 0;
+        for (int i=0;i<colIndex;i++){
+            colReduceNumOffset += reduceNumArr[i] ;
+        }
+
+        //Calculate reduce number , reduce num = (value.hash % colReduceNum) + 
colReduceNumOffset
+        byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
+        int hashCode = new Text(keyBytes).hashCode() &  0x7FFFFFFF ;
+        int reduceNo = hashCode % colReduceNum + colReduceNumOffset;
+
+        return reduceNo;
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java
new file mode 100644
index 0000000..07b0824
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BuildGlobalHiveDictPartBuildJob extends AbstractHadoopJob {
+    protected static final Logger logger = 
LoggerFactory.getLogger(BuildGlobalHiveDictPartBuildJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+        String[] dicColsArr=null;
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
+            parseOptions(options, args);
+
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            dicColsArr = config.getMrHiveDictColumnsExcludeRefColumns();
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+
+            // add metadata to distributed cache
+            String cubeName = getOptionValue(OPTION_CUBE_NAME);
+            String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+            CubeManager cubeMgr = CubeManager.getInstance(config);
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeSegment segment = cube.getSegmentById(segmentID);
+
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentID);
+
+            logger.info("Starting: " + job.getJobName());
+
+            job.setJarByClass(BuildGlobalHiveDictPartBuildJob.class);
+
+            setJobClasspath(job, cube.getConfig());
+
+            //FileInputFormat.setInputPaths(job, input);
+            setInputput(job, dicColsArr, getInputPath(config, segment));
+
+            // make each reducer output to respective dir
+            setOutput(job, dicColsArr, getOptionValue(OPTION_OUTPUT_PATH));
+            
job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", 
false);
+
+            //set reduce num
+            setReduceNum(job, config);
+
+            job.setInputFormatClass(KeyValueTextInputFormat.class);
+            job.setMapOutputKeyClass(Text.class);
+            job.setMapOutputValueClass(NullWritable.class);
+            job.setOutputKeyClass(LongWritable.class);
+            job.setOutputValueClass(Text.class);
+
+            job.setMapperClass(BuildGlobalHiveDictPartBuildMapper.class);
+
+            job.setPartitionerClass(BuildGlobalHiveDicPartPartitioner.class);
+            job.setReducerClass(BuildGlobalHiveDicPartBuildReducer.class);
+
+            // prevent to create zero-sized default output
+            LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
+
+            //delete output
+            Path baseOutputPath =new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            deletePath(job.getConfiguration(), baseOutputPath);
+
+            attachSegmentMetadataWithDict(segment, job.getConfiguration());
+            return waitForCompletion(job);
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+    private void setOutput(Job job, String[] dicColsArry, String outputBase){
+        // make each reducer output to respective dir
+        //eg: 
/user/kylin/tmp/kylin/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/kylin_sales_cube_mr/dict_column=KYLIN_SALES_SELLER_ID/part_sort
+        for(int i=0;i<dicColsArry.length;i++){
+            MultipleOutputs.addNamedOutput(job, i + "", 
TextOutputFormat.class, LongWritable.class, Text.class);
+        }
+        Path outputPath=new Path(outputBase);
+        FileOutputFormat.setOutputPath(job, outputPath);
+    }
+
+    private void setInputput(Job job, String[] dicColsArray, String inputBase) 
throws IOException {
+        StringBuffer paths=new StringBuffer();
+        // make each reducer output to respective dir
+        for(String col:dicColsArray){
+            
paths.append(inputBase).append("/dict_column=").append(col).append(",");
+        }
+
+        paths.delete(paths.length() - 1, paths.length());
+        FileInputFormat.setInputPaths(job, paths.toString());
+
+    }
+
+    private void setReduceNum(Job job, KylinConfig config){
+        Integer[] reduceNumArr = 
config.getMrHiveDictColumnsReduceNumExcludeRefCols();
+        int totalReduceNum = 0;
+        for(Integer num:reduceNumArr){
+            totalReduceNum +=num;
+        }
+        logger.info("BuildGlobalHiveDictPartBuildJob total reduce num is {}", 
totalReduceNum);
+        job.setNumReduceTasks(totalReduceNum);
+    }
+
+    private String getInputPath(KylinConfig config, CubeSegment segment){
+        String dbDir = config.getHiveDatabaseDir();
+        String tableName = 
EngineFactory.getJoinedFlatTableDesc(segment).getTableName()+config.getMrHiveDictIntermediateTTableSuffix();
+        String input = dbDir+"/"+tableName;
+        logger.info("part build base input path:"+input);
+        return input;
+    }
+
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java
new file mode 100644
index 0000000..54708f3
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BuildGlobalHiveDictPartBuildMapper<KEYIN, Object> extends 
KylinMapper<KEYIN, Object, Text, NullWritable> {
+    private static final Logger logger = 
LoggerFactory.getLogger(BuildGlobalHiveDictPartBuildMapper.class);
+
+    private Integer colIndex;
+    private ByteBuffer tmpbuf;
+    private Text outputKey = new Text();
+    private long count = 0L;
+
+    @Override
+    protected void doSetup(Context context) throws IOException, 
InterruptedException {
+        tmpbuf = ByteBuffer.allocate(64);
+
+        KylinConfig config;
+        try {
+            config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        String[] dicCols = config.getMrHiveDictColumnsExcludeRefColumns();
+        logger.info("kylin.dictionary.mr-hive.columns: exclude ref cols {}", 
dicCols);
+
+        //eg: 
/user/kylin/warehouse/db/kylin_intermediate_kylin_sales_cube_mr_6222c210_ce2d_e8ce_dd0f_f12c38fa9115__group_by/dict_column=KYLIN_SALES_SELLER_ID/part-000
+        FileSplit fileSplit = (FileSplit) context.getInputSplit();
+        //eg: dict_column=KYLIN_SALES_SELLER_ID
+        String name = fileSplit.getPath().getParent().getName();
+        logger.info("this map file name :{}", name);
+
+        //eg: KYLIN_SALES_SELLER_ID
+        String colName = name.split("=")[1];
+        logger.info("this map build col name :{}", colName);
+
+        for(int i=0;i<dicCols.length;i++){
+            if(dicCols[i].equalsIgnoreCase(colName)){
+                colIndex=i;
+            }
+        }
+        if(colIndex<0 || colIndex>127){
+            logger.error("kylin.dictionary.mr-hive.columns colIndex :{} error 
", colIndex);
+            logger.error("kylin.dictionary.mr-hive.columns set error,mr-hive 
columns's count should less than 128");
+        }
+        logger.info("this map build col index :{}", colIndex);
+
+    }
+
+    @Override
+    public void doMap(KEYIN key, Object record, Context context) throws 
IOException, InterruptedException {
+        count ++;
+        writeFieldValue(context, key.toString());
+    }
+
+
+    private void writeFieldValue(Context context, String value)
+            throws IOException, InterruptedException {
+        tmpbuf.clear();
+        byte[] valueBytes = Bytes.toBytes(value);
+        int size = valueBytes.length + 1;
+        if (size >= tmpbuf.capacity()) {
+            tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), 
size));
+        }
+        tmpbuf.put(colIndex.byteValue());//colIndex should less than 128
+        tmpbuf.put(valueBytes);
+        outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+        context.write(outputKey, NullWritable.get());
+        if(count<10){
+            logger.info("colIndex:{},input key:{}", colIndex, value);
+        }
+    }
+
+    private int countNewSize(int oldSize, int dataSize) {
+        int newSize = oldSize * 2;
+        while (newSize < dataSize) {
+            newSize = newSize * 2;
+        }
+        return newSize;
+    }
+}

Reply via email to