This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 85b78327d1869c021c6d47ed2eb57b21b2557b2c Author: wangxiaojing <wangxiaoj...@didichuxing.com> AuthorDate: Wed May 6 14:39:03 2020 +0800 KYLIN-4345 Build Global Dict by MR/Hive, Parallel Part Build Step implementation --- .../kylin/job/constant/ExecutableConstants.java | 1 + .../kylin/engine/mr/BatchCubingJobBuilder2.java | 18 ++- .../apache/kylin/engine/mr/JobBuilderSupport.java | 21 +++ .../steps/BuildGlobalHiveDicPartBuildReducer.java | 85 +++++++++++ .../steps/BuildGlobalHiveDicPartPartitioner.java | 73 ++++++++++ .../mr/steps/BuildGlobalHiveDictPartBuildJob.java | 156 +++++++++++++++++++++ .../steps/BuildGlobalHiveDictPartBuildMapper.java | 109 ++++++++++++++ 7 files changed, 462 insertions(+), 1 deletion(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 73c5ecc..b2d087b 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -86,6 +86,7 @@ public final class ExecutableConstants { // MR - Hive Dict public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL = "Build Global Dict - extract distinct value from data"; + public static final String STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL = "Build Global Dict - parallel part build"; public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL = "Build Global Dict - merge to dict table"; public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = "Build Global Dict - replace intermediate table"; diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index c566a13..0aae61d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -19,7 +19,8 @@ package org.apache.kylin.engine.mr; import java.util.List; - +import java.util.Objects; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.CuboidUtil; import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide; @@ -58,6 +59,21 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables inputSide.addStepPhase1_CreateFlatTable(result); + // build global dict + KylinConfig dictConfig = seg.getConfig(); + String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns(); + + if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 + && !"".equals(mrHiveDictColumns[0])) { + + //parallel part build + result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId)); + + //toDo parallel total build + } + + //toDo merge global dic and replace flat table + // Phase 2: Build Dictionary result.addTask(createFactDistinctColumnsStep(jobId)); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 6c236a5..dddb67d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -36,6 +36,7 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDictPartBuildJob; import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob; import org.apache.kylin.engine.mr.steps.CreateDictionaryJob; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; @@ -204,6 +205,22 @@ public class JobBuilderSupport { return result; } + + public MapReduceExecutable createBuildGlobalHiveDictPartBuildJob(String jobId) { + MapReduceExecutable result = new MapReduceExecutable(); + result.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL); + result.setMapReduceJobClass(BuildGlobalHiveDictPartBuildJob.class); + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, + ExecutableConstants.STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getBuildGlobalDictionaryBasePath(jobId)); + result.setMapReduceParams(cmd.toString()); + return result; + } + public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId, LookupMaterializeContext lookupMaterializeContext) { final UpdateCubeInfoAfterBuildStep result = new UpdateCubeInfoAfterBuildStep(); result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); @@ -341,6 +358,10 @@ public class JobBuilderSupport { return getRealizationRootPath(jobId) + "/dictionary_shrunken"; } + public String getBuildGlobalDictionaryBasePath(String jobId) { + return getRealizationRootPath(jobId) + "/global_dic"; + } + public String getDictRootPath(String jobId) { return getRealizationRootPath(jobId) + "/dict"; } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java new file mode 100644 index 0000000..7ecf16d --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.engine.mr.KylinReducer; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BuildGlobalHiveDicPartBuildReducer extends KylinReducer<Text, LongWritable, LongWritable, Text> { + + private static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDicPartBuildReducer.class); + + private Long count=0L; + private MultipleOutputs mos; + private String[] dicCols; + private String colName; + private int colIndex; + + @Override + protected void doSetup(Context context) throws IOException, InterruptedException { + mos = new MultipleOutputs(context); + KylinConfig config; + try { + config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + } catch (IOException e) { + throw new RuntimeException(e); + } + dicCols = config.getMrHiveDictColumnsExcludeRefColumns(); + } + + @Override + public void doReduce(Text key, Iterable<LongWritable> values, Context context) + throws IOException, InterruptedException { + count++; + byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1); + if(count==1){ + colIndex = key.getBytes()[0];//col index + colName = dicCols[colIndex]; + } + logAFewRows(key.toString()); + mos.write(colIndex+"", new LongWritable(count), new Text(keyBytes), "part_sort/"+colIndex); + } + + private void logAFewRows(String value) { + if(count<10){ + logger.info("key:{}, temp dict num :{},colIndex:{},colName:{}", value, count, colIndex, colName); + } + } + + @Override + protected void doCleanup(Context context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + + String partition = conf.get(MRJobConfig.TASK_PARTITION); + mos.write(colIndex + "", new LongWritable(count), new Text(partition), "reduce_stats/" + colIndex); + mos.close(); + logger.info("Reduce partition num {} finish,this reduce done item count is {}" , partition, count); + } + +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java new file mode 100644 index 0000000..97ad4f4 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; + +public class BuildGlobalHiveDicPartPartitioner extends Partitioner<Text, NullWritable> implements Configurable { + private Configuration conf; + + private Integer[] reduceNumArr; + + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + + KylinConfig config; + try { + config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + reduceNumArr = config.getMrHiveDictColumnsReduceNumExcludeRefCols(); + } + + @Override + public int getPartition(Text key, NullWritable value, int numReduceTasks) { + //get first byte, the first byte value is the dic col index ,start from 0 + int colIndex = key.getBytes()[0]; + int colReduceNum = reduceNumArr[colIndex]; + + int colReduceNumOffset = 0; + for (int i=0;i<colIndex;i++){ + colReduceNumOffset += reduceNumArr[i] ; + } + + //Calculate reduce number , reduce num = (value.hash % colReduceNum) + colReduceNumOffset + byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1); + int hashCode = new Text(keyBytes).hashCode() & 0x7FFFFFFF ; + int reduceNo = hashCode % colReduceNum + colReduceNumOffset; + + return reduceNo; + } + + @Override + public Configuration getConf() { + return conf; + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java new file mode 100644 index 0000000..07b0824 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BuildGlobalHiveDictPartBuildJob extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDictPartBuildJob.class); + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + String[] dicColsArr=null; + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + parseOptions(options, args); + + KylinConfig config = KylinConfig.getInstanceFromEnv(); + dicColsArr = config.getMrHiveDictColumnsExcludeRefColumns(); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + + // add metadata to distributed cache + String cubeName = getOptionValue(OPTION_CUBE_NAME); + String segmentID = getOptionValue(OPTION_SEGMENT_ID); + CubeManager cubeMgr = CubeManager.getInstance(config); + CubeInstance cube = cubeMgr.getCube(cubeName); + CubeSegment segment = cube.getSegmentById(segmentID); + + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); + + logger.info("Starting: " + job.getJobName()); + + job.setJarByClass(BuildGlobalHiveDictPartBuildJob.class); + + setJobClasspath(job, cube.getConfig()); + + //FileInputFormat.setInputPaths(job, input); + setInputput(job, dicColsArr, getInputPath(config, segment)); + + // make each reducer output to respective dir + setOutput(job, dicColsArr, getOptionValue(OPTION_OUTPUT_PATH)); + job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", false); + + //set reduce num + setReduceNum(job, config); + + job.setInputFormatClass(KeyValueTextInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(NullWritable.class); + job.setOutputKeyClass(LongWritable.class); + job.setOutputValueClass(Text.class); + + job.setMapperClass(BuildGlobalHiveDictPartBuildMapper.class); + + job.setPartitionerClass(BuildGlobalHiveDicPartPartitioner.class); + job.setReducerClass(BuildGlobalHiveDicPartBuildReducer.class); + + // prevent to create zero-sized default output + LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); + + //delete output + Path baseOutputPath =new Path(getOptionValue(OPTION_OUTPUT_PATH)); + deletePath(job.getConfiguration(), baseOutputPath); + + attachSegmentMetadataWithDict(segment, job.getConfiguration()); + return waitForCompletion(job); + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + } + + private void setOutput(Job job, String[] dicColsArry, String outputBase){ + // make each reducer output to respective dir + //eg: /user/kylin/tmp/kylin/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/kylin_sales_cube_mr/dict_column=KYLIN_SALES_SELLER_ID/part_sort + for(int i=0;i<dicColsArry.length;i++){ + MultipleOutputs.addNamedOutput(job, i + "", TextOutputFormat.class, LongWritable.class, Text.class); + } + Path outputPath=new Path(outputBase); + FileOutputFormat.setOutputPath(job, outputPath); + } + + private void setInputput(Job job, String[] dicColsArray, String inputBase) throws IOException { + StringBuffer paths=new StringBuffer(); + // make each reducer output to respective dir + for(String col:dicColsArray){ + paths.append(inputBase).append("/dict_column=").append(col).append(","); + } + + paths.delete(paths.length() - 1, paths.length()); + FileInputFormat.setInputPaths(job, paths.toString()); + + } + + private void setReduceNum(Job job, KylinConfig config){ + Integer[] reduceNumArr = config.getMrHiveDictColumnsReduceNumExcludeRefCols(); + int totalReduceNum = 0; + for(Integer num:reduceNumArr){ + totalReduceNum +=num; + } + logger.info("BuildGlobalHiveDictPartBuildJob total reduce num is {}", totalReduceNum); + job.setNumReduceTasks(totalReduceNum); + } + + private String getInputPath(KylinConfig config, CubeSegment segment){ + String dbDir = config.getHiveDatabaseDir(); + String tableName = EngineFactory.getJoinedFlatTableDesc(segment).getTableName()+config.getMrHiveDictIntermediateTTableSuffix(); + String input = dbDir+"/"+tableName; + logger.info("part build base input path:"+input); + return input; + } + +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java new file mode 100644 index 0000000..54708f3 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BuildGlobalHiveDictPartBuildMapper<KEYIN, Object> extends KylinMapper<KEYIN, Object, Text, NullWritable> { + private static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDictPartBuildMapper.class); + + private Integer colIndex; + private ByteBuffer tmpbuf; + private Text outputKey = new Text(); + private long count = 0L; + + @Override + protected void doSetup(Context context) throws IOException, InterruptedException { + tmpbuf = ByteBuffer.allocate(64); + + KylinConfig config; + try { + config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + String[] dicCols = config.getMrHiveDictColumnsExcludeRefColumns(); + logger.info("kylin.dictionary.mr-hive.columns: exclude ref cols {}", dicCols); + + //eg: /user/kylin/warehouse/db/kylin_intermediate_kylin_sales_cube_mr_6222c210_ce2d_e8ce_dd0f_f12c38fa9115__group_by/dict_column=KYLIN_SALES_SELLER_ID/part-000 + FileSplit fileSplit = (FileSplit) context.getInputSplit(); + //eg: dict_column=KYLIN_SALES_SELLER_ID + String name = fileSplit.getPath().getParent().getName(); + logger.info("this map file name :{}", name); + + //eg: KYLIN_SALES_SELLER_ID + String colName = name.split("=")[1]; + logger.info("this map build col name :{}", colName); + + for(int i=0;i<dicCols.length;i++){ + if(dicCols[i].equalsIgnoreCase(colName)){ + colIndex=i; + } + } + if(colIndex<0 || colIndex>127){ + logger.error("kylin.dictionary.mr-hive.columns colIndex :{} error ", colIndex); + logger.error("kylin.dictionary.mr-hive.columns set error,mr-hive columns's count should less than 128"); + } + logger.info("this map build col index :{}", colIndex); + + } + + @Override + public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { + count ++; + writeFieldValue(context, key.toString()); + } + + + private void writeFieldValue(Context context, String value) + throws IOException, InterruptedException { + tmpbuf.clear(); + byte[] valueBytes = Bytes.toBytes(value); + int size = valueBytes.length + 1; + if (size >= tmpbuf.capacity()) { + tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size)); + } + tmpbuf.put(colIndex.byteValue());//colIndex should less than 128 + tmpbuf.put(valueBytes); + outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); + context.write(outputKey, NullWritable.get()); + if(count<10){ + logger.info("colIndex:{},input key:{}", colIndex, value); + } + } + + private int countNewSize(int oldSize, int dataSize) { + int newSize = oldSize * 2; + while (newSize < dataSize) { + newSize = newSize * 2; + } + return newSize; + } +}