KYLIN-1245 Put layer cubing and in-mem cubing side by side, random switch between them
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3fc3883a Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3fc3883a Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3fc3883a Branch: refs/heads/2.x-staging Commit: 3fc3883a41bfab899c31b300db4c0757bd5589d3 Parents: 2ef13d0 Author: Yang Li <liy...@apache.org> Authored: Mon Dec 21 19:52:31 2015 +0800 Committer: Yang Li <liy...@apache.org> Committed: Mon Dec 21 19:52:31 2015 +0800 ---------------------------------------------------------------------- .../kylin/engine/mr/BatchCubingJobBuilder2.java | 97 +++++- .../kylin/engine/mr/BatchMergeJobBuilder2.java | 35 +- .../org/apache/kylin/engine/mr/IMROutput2.java | 104 +++--- .../java/org/apache/kylin/engine/mr/MRUtil.java | 6 - .../engine/mr/common/AbstractHadoopJob.java | 1 - .../kylin/engine/mr/steps/InMemCuboidJob.java | 201 ++++++++++- .../engine/mr/steps/InMemCuboidReducer.java | 28 +- .../mr/steps/MergeCuboidFromStorageJob.java | 94 ------ .../mr/steps/MergeCuboidFromStorageMapper.java | 239 ------------- .../apache/kylin/engine/spark/SparkCubing.java | 4 +- .../storage/hbase/steps/CreateHTableJob.java | 143 +------- .../storage/hbase/steps/HBaseMROutput2.java | 290 ---------------- .../hbase/steps/HBaseMROutput2Transition.java | 331 +------------------ 13 files changed, 378 insertions(+), 1195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index f8fbc33..476a763 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -18,11 +18,16 @@ package org.apache.kylin.engine.mr; +import java.util.Random; + import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.RowKeyDesc; import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide; import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2; import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.engine.mr.steps.BaseCuboidJob; import org.apache.kylin.engine.mr.steps.InMemCuboidJob; +import org.apache.kylin.engine.mr.steps.NDCuboidJob; import org.apache.kylin.engine.mr.steps.SaveStatisticsStep; import org.apache.kylin.job.constant.ExecutableConstants; import org.slf4j.Logger; @@ -37,14 +42,15 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) { super(newSegment, submitter); this.inputSide = MRUtil.getBatchCubingInputSide(seg); - this.outputSide = MRUtil.getBatchCubingOutputSide2((CubeSegment)seg); + this.outputSide = MRUtil.getBatchCubingOutputSide2((CubeSegment) seg); } public CubingJob build() { logger.info("MR_V2 new job to BUILD segment " + seg); - - final CubingJob result = CubingJob.createBuildJob((CubeSegment)seg, submitter, config); + + final CubingJob result = CubingJob.createBuildJob((CubeSegment) seg, submitter, config); final String jobId = result.getId(); + final String cuboidRootPath = getCuboidRootPath(jobId); // Phase 1: Create Flat Table inputSide.addStepPhase1_CreateFlatTable(result); @@ -56,8 +62,14 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { outputSide.addStepPhase2_BuildDictionary(result); // Phase 3: Build Cube - result.addTask(createInMemCubingStep(jobId)); - outputSide.addStepPhase3_BuildCube(result); + if (new Random().nextBoolean()) { + // layer cubing + addLayerCubingSteps(result, jobId, cuboidRootPath); + } else { + // or in-mem cubing + result.addTask(createInMemCubingStep(jobId, cuboidRootPath)); + } + outputSide.addStepPhase3_BuildCube(result, cuboidRootPath); // Phase 4: Update Metadata & Cleanup result.addTask(createUpdateCubeInfoAfterBuildStep(jobId)); @@ -67,6 +79,20 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { return result; } + private void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { + RowKeyDesc rowKeyDesc = ((CubeSegment) seg).getCubeDesc().getRowkey(); + final int groupRowkeyColumnsCount = ((CubeSegment) seg).getCubeDesc().getBuildLevel(); + final int totalRowkeyColumnsCount = rowKeyDesc.getRowKeyColumns().length; + final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount); + // base cuboid step + result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId)); + // n dim cuboid steps + for (int i = 1; i <= groupRowkeyColumnsCount; i++) { + int dimNum = totalRowkeyColumnsCount - i; + result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount)); + } + } + private SaveStatisticsStep createSaveStatisticsStep(String jobId) { SaveStatisticsStep result = new SaveStatisticsStep(); result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS); @@ -76,19 +102,19 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { return result; } - private MapReduceExecutable createInMemCubingStep(String jobId) { + private MapReduceExecutable createInMemCubingStep(String jobId, String cuboidRootPath) { // base cuboid job MapReduceExecutable cubeStep = new MapReduceExecutable(); StringBuilder cmd = new StringBuilder(); - appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel()); + appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel()); cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE); appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); appendExecCmdParameters(cmd, "segmentname", seg.getName()); + appendExecCmdParameters(cmd, "output", cuboidRootPath); appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getRealization().getName()); - appendExecCmdParameters(cmd, "jobflowid", jobId); cubeStep.setMapReduceParams(cmd.toString()); cubeStep.setMapReduceJobClass(InMemCuboidJob.class); @@ -96,4 +122,59 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { return cubeStep; } + private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) { + // base cuboid job + MapReduceExecutable baseCuboidStep = new MapReduceExecutable(); + + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel()); + + baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID); + + appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); + appendExecCmdParameters(cmd, "segmentname", seg.getName()); + appendExecCmdParameters(cmd, "input", "FLAT_TABLE"); // marks flat table input + appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]); + appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); + appendExecCmdParameters(cmd, "level", "0"); + + baseCuboidStep.setMapReduceParams(cmd.toString()); + baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class); + baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES); + return baseCuboidStep; + } + + private MapReduceExecutable createNDimensionCuboidStep(String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount) { + // ND cuboid job + MapReduceExecutable ndCuboidStep = new MapReduceExecutable(); + + ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension"); + StringBuilder cmd = new StringBuilder(); + + appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel()); + appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); + appendExecCmdParameters(cmd, "segmentname", seg.getName()); + appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]); + appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]); + appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum)); + + ndCuboidStep.setMapReduceParams(cmd.toString()); + ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class); + return ndCuboidStep; + } + + private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) { + String[] paths = new String[groupRowkeyColumnsCount + 1]; + for (int i = 0; i <= groupRowkeyColumnsCount; i++) { + int dimNum = totalRowkeyColumnCount - i; + if (dimNum == totalRowkeyColumnCount) { + paths[i] = cuboidRootPath + "base_cuboid"; + } else { + paths[i] = cuboidRootPath + dimNum + "d_cuboid"; + } + } + return paths; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java index 48a717f..008d489 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java @@ -23,7 +23,7 @@ import java.util.List; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.common.MapReduceExecutable; -import org.apache.kylin.engine.mr.steps.MergeCuboidFromStorageJob; +import org.apache.kylin.engine.mr.steps.MergeCuboidJob; import org.apache.kylin.engine.mr.steps.MergeStatisticsStep; import org.apache.kylin.job.constant.ExecutableConstants; import org.slf4j.Logger; @@ -39,23 +39,24 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) { super(mergeSegment, submitter); - this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment)seg); + this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment) seg); } public CubingJob build() { logger.info("MR_V2 new job to MERGE segment " + seg); - - final CubeSegment cubeSegment = (CubeSegment)seg; + + final CubeSegment cubeSegment = (CubeSegment) seg; final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config); final String jobId = result.getId(); + final String cuboidRootPath = getCuboidRootPath(jobId); final List<CubeSegment> mergingSegments = cubeSegment.getCubeInstance().getMergingSegments(cubeSegment); Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge"); final List<String> mergingSegmentIds = Lists.newArrayList(); - final List<String> mergingHTables = Lists.newArrayList(); + final List<String> mergingCuboidPaths = Lists.newArrayList(); for (CubeSegment merging : mergingSegments) { mergingSegmentIds.add(merging.getUuid()); - mergingHTables.add(merging.getStorageLocationIdentifier()); + mergingCuboidPaths.add(getCuboidRootPath(merging) + "*"); } // Phase 1: Merge Dictionary @@ -63,10 +64,10 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId))); outputSide.addStepPhase1_MergeDictionary(result); - // Phase 2: Merge Cube - String formattedTables = StringUtil.join(mergingHTables, ","); - result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId)); - outputSide.addStepPhase2_BuildCube(result); + // Phase 2: Merge Cube Files + String formattedPath = StringUtil.join(mergingCuboidPaths, ","); + result.addTask(createMergeCuboidDataStep(cubeSegment, formattedPath, cuboidRootPath)); + outputSide.addStepPhase2_BuildCube(result, cuboidRootPath); // Phase 3: Update Metadata & Cleanup result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId)); @@ -85,20 +86,20 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { return result; } - private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) { + private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) { MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable(); mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID); StringBuilder cmd = new StringBuilder(); - appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel()); - appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); + appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc()); + appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getRealization().getName() + "_Step"); - appendExecCmdParameters(cmd, "jobflowid", jobId); + appendExecCmdParameters(cmd, "input", inputPath); + appendExecCmdParameters(cmd, "output", outputPath); + appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); mergeCuboidDataStep.setMapReduceParams(cmd.toString()); - mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromStorageJob.class); - mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES); + mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class); return mergeCuboidDataStep; } http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java index 3ad51c5..844eb07 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java @@ -1,13 +1,23 @@ -package org.apache.kylin.engine.mr; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ -import java.io.IOException; +package org.apache.kylin.engine.mr; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.job.execution.DefaultChainedExecutable; @@ -17,93 +27,59 @@ public interface IMROutput2 { public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg); /** - * Participate the batch cubing flow as the output side. + * Participate the batch cubing flow as the output side. Responsible for saving + * the cuboid output to storage at the end of Phase 3. * * - Phase 1: Create Flat Table * - Phase 2: Build Dictionary - * - Phase 3: Build Cube (with StorageOutputFormat) + * - Phase 3: Build Cube * - Phase 4: Update Metadata & Cleanup */ public interface IMRBatchCubingOutputSide2 { - /** Return an output format for Phase 3: Build Cube MR */ - public IMRStorageOutputFormat getStorageOutputFormat(); - /** Add step that executes after build dictionary and before build cube. */ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow); - /** Add step that executes after build cube. */ - public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow); - - /** Add step that does any necessary clean up. */ - public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow); - } - - public IMRBatchMergeInputSide2 getBatchMergeInputSide(CubeSegment seg); - - public interface IMRBatchMergeInputSide2 { - public IMRStorageInputFormat getStorageInputFormat(); - } - - /** Read in a cube as input of merge. Configure the input file format of mapper. */ - @SuppressWarnings("rawtypes") - public interface IMRStorageInputFormat { - - /** Configure MR mapper class and input file format. */ - public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException; - - /** Given a mapper context, figure out which segment the mapper reads from. */ - public CubeSegment findSourceSegment(Mapper.Context context) throws IOException; - /** - * Read in a row of cuboid. Given the input KV, de-serialize back cuboid ID, dimensions, and measures. + * Add step that saves cuboids from HDFS to storage. * - * @return <code>ByteArrayWritable</code> is the cuboid ID (8 bytes) + dimension values in dictionary encoding - * <code>Object[]</code> is the measure values in order of <code>CubeDesc.getMeasures()</code> + * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn, + * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with + * dictionary encoding; Mx is measure value serialization form. */ - public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue); + public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath); + + /** Add step that does any necessary clean up. */ + public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow); } /** Return a helper to participate in batch merge job flow. */ public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg); /** - * Participate the batch merge flow as the output side. + * Participate the batch cubing flow as the output side. Responsible for saving + * the cuboid output to storage at the end of Phase 2. * * - Phase 1: Merge Dictionary - * - Phase 2: Merge Cube (with StorageInputFormat & StorageOutputFormat) + * - Phase 2: Merge Cube * - Phase 3: Update Metadata & Cleanup */ public interface IMRBatchMergeOutputSide2 { - /** Return an input format for Phase 2: Merge Cube MR */ - public IMRStorageOutputFormat getStorageOutputFormat(); - /** Add step that executes after merge dictionary and before merge cube. */ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow); - /** Add step that executes after merge cube. */ - public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow); + /** + * Add step that saves cuboid output from HDFS to storage. + * + * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn, + * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with + * dictionary encoding; Mx is measure value serialization form. + */ + public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath); /** Add step that does any necessary clean up. */ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow); } - /** Write out a cube. Configure the output file format of reducer and do the actual K-V output. */ - @SuppressWarnings("rawtypes") - public interface IMRStorageOutputFormat { - - /** Configure MR reducer class and output file format. */ - public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException; - - /** - * Write out a row of cuboid. Given the cuboid ID, dimensions, and measures, serialize in whatever - * way and output to reducer context. - * - * @param key The cuboid ID (8 bytes) + dimension values in dictionary encoding - * @param value The measure values in order of <code>CubeDesc.getMeasures()</code> - * @param context The reducer context output goes to - */ - public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException; - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java index 55fa9e2..41c8b6b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java @@ -7,12 +7,10 @@ import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide; import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide; import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2; -import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeInputSide2; import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2; import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.IRealizationSegment; import org.apache.kylin.source.SourceFactory; import org.apache.kylin.storage.StorageFactory; @@ -47,10 +45,6 @@ public class MRUtil { return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchCubingOutputSide(seg); } - public static IMRBatchMergeInputSide2 getBatchMergeInputSide2(CubeSegment seg) { - return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeInputSide(seg); - } - public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) { return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg); } http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index f031f76..21ceff7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -72,7 +72,6 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class); protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname"); - protected static final Option OPTION_JOB_FLOW_ID = OptionBuilder.withArgName("job flow ID").hasArg().isRequired(true).withDescription("job flow ID").create("jobflowid"); protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename"); protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname"); protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname"); http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index 8f2b810..95eb725 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -18,23 +18,57 @@ package org.apache.kylin.engine.mr.steps; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nullable; + import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.ByteArrayWritable; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; -import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat; import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + /** */ public class InMemCuboidJob extends AbstractHadoopJob { @@ -47,13 +81,14 @@ public class InMemCuboidJob extends AbstractHadoopJob { try { options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_JOB_FLOW_ID); options.addOption(OPTION_CUBE_NAME); options.addOption(OPTION_SEGMENT_NAME); + options.addOption(OPTION_OUTPUT_PATH); parseOptions(options, args); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); String segmentName = getOptionValue(OPTION_SEGMENT_NAME); + String output = getOptionValue(OPTION_OUTPUT_PATH); KylinConfig config = KylinConfig.getInstanceFromEnv(); CubeManager cubeMgr = CubeManager.getInstance(config); @@ -84,9 +119,19 @@ public class InMemCuboidJob extends AbstractHadoopJob { job.setMapOutputValueClass(ByteArrayWritable.class); // set output - IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat(); - storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job); + job.setReducerClass(InMemCuboidReducer.class); + job.setNumReduceTasks(calculateReducerNum(cubeSeg)); + + // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + Path outputPath = new Path(output); + FileOutputFormat.setOutputPath(job, outputPath); + HadoopUtil.deletePath(job.getConfiguration(), outputPath); + return waitForCompletion(job); } catch (Exception e) { logger.error("error in CuboidJob", e); @@ -98,6 +143,154 @@ public class InMemCuboidJob extends AbstractHadoopJob { } } + private int calculateReducerNum(CubeSegment cubeSeg) throws IOException { + Configuration jobConf = job.getConfiguration(); + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + + Map<Long, Double> cubeSizeMap = getCubeSizeMapFromCuboidStatistics(cubeSeg, kylinConfig, jobConf); + double totalSizeInM = 0; + for (Double cuboidSize : cubeSizeMap.values()) { + totalSizeInM += cuboidSize; + } + + double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); + + // number of reduce tasks + int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB); + + // at least 1 reducer + numReduceTasks = Math.max(1, numReduceTasks); + // no more than 5000 reducer by default + numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); + + logger.info("Having total map input MB " + Math.round(totalSizeInM)); + logger.info("Having per reduce MB " + perReduceInputMB); + logger.info("Setting " + "mapred.reduce.tasks" + "=" + numReduceTasks); + return numReduceTasks; + } + + public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig kylinConfig, Configuration conf) throws IOException { + ResourceStore rs = ResourceStore.getStore(kylinConfig); + String fileKey = cubeSegment.getStatisticsResourcePath(); + InputStream is = rs.getResource(fileKey).inputStream; + File tempFile = null; + FileOutputStream tempFileStream = null; + try { + tempFile = File.createTempFile(cubeSegment.getUuid(), ".seq"); + tempFileStream = new FileOutputStream(tempFile); + org.apache.commons.io.IOUtils.copy(is, tempFileStream); + } finally { + IOUtils.closeStream(is); + IOUtils.closeStream(tempFileStream); + } + Map<Long, HyperLogLogPlusCounter> counterMap = Maps.newHashMap(); + + FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath()); + int samplingPercentage = 25; + SequenceFile.Reader reader = null; + try { + reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf); + LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); + BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf); + while (reader.next(key, value)) { + if (key.get() == 0L) { + samplingPercentage = Bytes.toInt(value.getBytes()); + } else { + HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14); + ByteArray byteArray = new ByteArray(value.getBytes()); + hll.readRegisters(byteArray.asBuffer()); + counterMap.put(key.get(), hll); + } + + } + } catch (Exception e) { + e.printStackTrace(); + throw e; + } finally { + IOUtils.closeStream(reader); + tempFile.delete(); + } + return getCubeRowCountMapFromCuboidStatistics(counterMap, samplingPercentage); + } + + public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(Map<Long, HyperLogLogPlusCounter> counterMap, final int samplingPercentage) throws IOException { + Preconditions.checkArgument(samplingPercentage > 0); + return Maps.transformValues(counterMap, new Function<HyperLogLogPlusCounter, Long>() { + @Nullable + @Override + public Long apply(HyperLogLogPlusCounter input) { + return input.getCountEstimate() * 100 / samplingPercentage; + } + }); + } + + // return map of Cuboid ID => MB + public static Map<Long, Double> getCubeSizeMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig kylinConfig, Configuration conf) throws IOException { + Map<Long, Long> rowCountMap = getCubeRowCountMapFromCuboidStatistics(cubeSegment, kylinConfig, conf); + Map<Long, Double> sizeMap = getCubeSizeMapFromRowCount(cubeSegment, rowCountMap); + return sizeMap; + } + + public static Map<Long, Double> getCubeSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap) { + final CubeDesc cubeDesc = cubeSegment.getCubeDesc(); + final List<Integer> rowkeyColumnSize = Lists.newArrayList(); + final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + final List<TblColRef> columnList = baseCuboid.getColumns(); + + for (int i = 0; i < columnList.size(); i++) { + rowkeyColumnSize.add(cubeSegment.getColumnLength(columnList.get(i))); + } + + Map<Long, Double> sizeMap = Maps.newHashMap(); + for (Map.Entry<Long, Long> entry : rowCountMap.entrySet()) { + sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), baseCuboidId, rowkeyColumnSize)); + } + return sizeMap; + } + + /** + * Estimate the cuboid's size + * + * @return the cuboid size in M bytes + */ + private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) { + + int bytesLength = cubeSegment.getRowKeyPreambleSize(); + + long mask = Long.highestOneBit(baseCuboidId); + long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId); + for (int i = 0; i < parentCuboidIdActualLength; i++) { + if ((mask & cuboidId) > 0) { + bytesLength += rowKeyColumnLength.get(i); //colIO.getColumnLength(columnList.get(i)); + } + mask = mask >> 1; + } + + // add the measure length + int space = 0; + boolean isMemoryHungry = false; + for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) { + if (measureDesc.getFunction().getMeasureType().isMemoryHungry()) { + isMemoryHungry = true; + } + DataType returnType = measureDesc.getFunction().getReturnDataType(); + space += returnType.getStorageBytesEstimate(); + } + bytesLength += space; + + double ret = 1.0 * bytesLength * rowCount / (1024L * 1024L); + if (isMemoryHungry) { + logger.info("Cube is memory hungry, storage size estimation multiply 0.05"); + ret *= 0.05; + } else { + logger.info("Cube is not memory hungry, storage size estimation multiply 0.25"); + ret *= 0.25; + } + logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + bytesLength + " bytes." + " Total size is " + ret + "M."); + return ret; + } + public static void main(String[] args) throws Exception { InMemCuboidJob job = new InMemCuboidJob(); int exitCode = ToolRunner.run(job, args); http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java index c35e77f..9beacbb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java @@ -1,17 +1,18 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; +import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.ByteArrayWritable; -import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat; import org.apache.kylin.engine.mr.KylinReducer; -import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.measure.MeasureAggregators; @@ -27,13 +28,16 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra private static final Logger logger = LoggerFactory.getLogger(InMemCuboidReducer.class); - private IMRStorageOutputFormat storageOutputFormat; private MeasureCodec codec; private MeasureAggregators aggs; private int counter; private Object[] input; private Object[] result; + + private Text outputKey; + private Text outputValue; + private ByteBuffer valueBuf; @Override protected void setup(Context context) throws IOException { @@ -47,16 +51,16 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); CubeDesc cubeDesc = cube.getDescriptor(); CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW); - if (isMerge) - storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat(); - else - storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat(); List<MeasureDesc> measuresDescs = cubeDesc.getMeasures(); codec = new MeasureCodec(measuresDescs); aggs = new MeasureAggregators(measuresDescs); input = new Object[measuresDescs.size()]; result = new Object[measuresDescs.size()]; + + outputKey = new Text(); + outputValue = new Text(); + valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); } @Override @@ -70,8 +74,16 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra } aggs.collectStates(result); - storageOutputFormat.doReducerOutput(key, result, context); + // output key + outputKey.set(key.array(), key.offset(), key.length()); + // output value + valueBuf.clear(); + codec.encode(result, valueBuf); + outputValue.set(valueBuf.array(), 0, valueBuf.position()); + + context.write(outputKey, outputValue); + counter++; if (counter % BatchConstants.COUNTER_MAX == 0) { logger.info("Handled " + counter + " records!"); http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java deleted file mode 100644 index 4485d17..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.steps; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.Job; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.engine.mr.ByteArrayWritable; -import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat; -import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat; -import org.apache.kylin.engine.mr.MRUtil; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.metadata.model.SegmentStatusEnum; - -/** - */ -public class MergeCuboidFromStorageJob extends CuboidJob { - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_JOB_FLOW_ID); - options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_SEGMENT_NAME); - parseOptions(options, args); - - String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); - String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase(); - KylinConfig config = KylinConfig.getInstanceFromEnv(); - CubeManager cubeMgr = CubeManager.getInstance(config); - CubeInstance cube = cubeMgr.getCube(cubeName); - CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW); - - Configuration conf = this.getConf(); - - // start job - String jobName = getOptionValue(OPTION_JOB_NAME); - System.out.println("Starting: " + jobName); - job = Job.getInstance(conf, jobName); - - setJobClasspath(job); - - // add metadata to distributed cache - attachKylinPropsAndMetadata(cube, job.getConfiguration()); - - // set job configuration - job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); - job.getConfiguration().set(BatchConstants.CFG_IS_MERGE, "true"); - - // configure mapper input - IMRStorageInputFormat storageInputFormat = MRUtil.getBatchMergeInputSide2(cubeSeg).getStorageInputFormat(); - storageInputFormat.configureInput(MergeCuboidFromStorageMapper.class, ByteArrayWritable.class, ByteArrayWritable.class, job); - - // configure reducer output - IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat(); - storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job); - - return waitForCompletion(job); - } catch (Exception e) { - logger.error("error in MergeCuboidFromHBaseJob", e); - printUsage(options); - throw e; - } finally { - if (job != null) - cleanupTempConfFile(job.getConfiguration()); - } - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java deleted file mode 100644 index 18bce34..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.steps; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.common.util.SplittedBytes; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.common.RowKeySplitter; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.cube.kv.RowKeyEncoder; -import org.apache.kylin.cube.kv.RowKeyEncoderProvider; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.dict.DictionaryManager; -import org.apache.kylin.engine.mr.ByteArrayWritable; -import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat; -import org.apache.kylin.engine.mr.KylinMapper; -import org.apache.kylin.engine.mr.MRUtil; -import org.apache.kylin.engine.mr.common.AbstractHadoopJob; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.measure.MeasureCodec; -import org.apache.kylin.measure.MeasureIngester; -import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TblColRef; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * @author shaoshi - */ -@SuppressWarnings({ "rawtypes", "unchecked" }) -public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, ByteArrayWritable, ByteArrayWritable> { - - private static final Logger logger = LoggerFactory.getLogger(MergeCuboidFromStorageMapper.class); - - private KylinConfig config; - private String cubeName; - private String segmentName; - private CubeManager cubeManager; - private CubeInstance cube; - private CubeDesc cubeDesc; - private CubeSegment mergedCubeSegment; - private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle - private IMRStorageInputFormat storageInputFormat; - - private ByteArrayWritable outputKey = new ByteArrayWritable(); - private byte[] newKeyBodyBuf; - private ByteArray newKeyBuf; - private RowKeySplitter rowKeySplitter; - private RowKeyEncoderProvider rowKeyEncoderProvider; - - private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>(); - - private List<MeasureDesc> measureDescs; - private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - private MeasureCodec codec; - private ByteArrayWritable outputValue = new ByteArrayWritable(); - - private List<Pair<Integer, MeasureIngester>> dictMeasures; - private Map<TblColRef, Dictionary<String>> oldDicts; - private Map<TblColRef, Dictionary<String>> newDicts; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.bindCurrentConfiguration(context.getConfiguration()); - config = AbstractHadoopJob.loadKylinPropsAndMetadata(); - - cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); - segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase(); - - cubeManager = CubeManager.getInstance(config); - cube = cubeManager.getCube(cubeName); - cubeDesc = cube.getDescriptor(); - mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); - storageInputFormat = MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat(); - - newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; // size will auto-grow - newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE); - - sourceCubeSegment = storageInputFormat.findSourceSegment(context); - logger.info("Source cube segment: " + sourceCubeSegment); - - rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255); - rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment); - - measureDescs = cubeDesc.getMeasures(); - codec = new MeasureCodec(measureDescs); - - dictMeasures = Lists.newArrayList(); - for (int i = 0; i < measureDescs.size(); i++) { - MeasureDesc measureDesc = measureDescs.get(i); - MeasureType measureType = measureDesc.getFunction().getMeasureType(); - if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) { - dictMeasures.add(Pair.newPair(i, measureType.newIngester())); - } - } - if (dictMeasures.size() > 0) { - oldDicts = sourceCubeSegment.buildDictionaryMap(); - newDicts = mergedCubeSegment.buildDictionaryMap(); - } - } - - @Override - public void map(Object inKey, Object inValue, Context context) throws IOException, InterruptedException { - Pair<ByteArrayWritable, Object[]> pair = storageInputFormat.parseMapperInput(inKey, inValue); - ByteArrayWritable key = pair.getFirst(); - Object[] value = pair.getSecond(); - - Preconditions.checkState(key.offset() == 0); - - long cuboidID = rowKeySplitter.split(key.array()); - Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID); - RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid); - - SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers(); - int bufOffset = 0; - int bodySplitOffset = rowKeySplitter.getBodySplitOffset(); - - for (int i = 0; i < cuboid.getColumns().size(); ++i) { - int useSplit = i + bodySplitOffset; - TblColRef col = cuboid.getColumns().get(i); - - if (this.checkNeedMerging(col)) { - // if dictionary on fact table column, needs rewrite - DictionaryManager dictMgr = DictionaryManager.getInstance(config); - Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col)); - Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col)); - - while (sourceDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || // - mergedDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || // - mergedDict.getSizeOfId() > newKeyBodyBuf.length - bufOffset) { - //also use this buf to hold value before translating - byte[] oldBuf = newKeyBodyBuf; - newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; - System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); - } - - int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, splittedByteses[useSplit].length); - int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); - - int idInMergedDict; - if (size < 0) { - idInMergedDict = mergedDict.nullId(); - } else { - idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); - } - BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); - - bufOffset += mergedDict.getSizeOfId(); - } else { - // keep as it is - while (splittedByteses[useSplit].length > newKeyBodyBuf.length - bufOffset) { - byte[] oldBuf = newKeyBodyBuf; - newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; - System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); - } - - System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length); - bufOffset += splittedByteses[useSplit].length; - } - } - - int fullKeySize = rowkeyEncoder.getBytesLength(); - while (newKeyBuf.array().length < fullKeySize) { - newKeyBuf.set(new byte[newKeyBuf.length() * 2]); - } - newKeyBuf.set(0, fullKeySize); - - rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf); - outputKey.set(newKeyBuf.array(), 0, fullKeySize); - - // re-encode measures if dictionary is used - if (dictMeasures.size() > 0) { - reEncodeMeasure(value); - } - - valueBuf.clear(); - codec.encode(value, valueBuf); - outputValue.set(valueBuf.array(), 0, valueBuf.position()); - - context.write(outputKey, outputValue); - } - - private Boolean checkNeedMerging(TblColRef col) throws IOException { - Boolean ret = dimensionsNeedDict.get(col); - if (ret != null) - return ret; - else { - ret = cubeDesc.getRowkey().isUseDictionary(col); - if (ret) { - String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().isUseDictionary(col), col).getTable(); - ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable); - } - dimensionsNeedDict.put(col, ret); - return ret; - } - } - - private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException { - for (Pair<Integer, MeasureIngester> pair : dictMeasures) { - int i = pair.getFirst(); - MeasureIngester ingester = pair.getSecond(); - measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index be81c2b..59a19d3 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -56,6 +56,7 @@ import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.*; import org.apache.kylin.cube.util.CubingUtils; import org.apache.kylin.dict.*; +import org.apache.kylin.engine.mr.steps.InMemCuboidJob; import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter; import org.apache.kylin.engine.spark.cube.DefaultTupleConverter; import org.apache.kylin.engine.spark.util.IteratorUtils; @@ -453,7 +454,8 @@ public class SparkCubing extends AbstractApplication { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); - final Map<Long, Long> cubeSizeMap = CreateHTableJob.getCubeRowCountMapFromCuboidStatistics(samplingResult, 100); + final Map<Long, Long> rowCountMap = InMemCuboidJob.getCubeRowCountMapFromCuboidStatistics(samplingResult, 100); + final Map<Long, Double> cubeSizeMap = InMemCuboidJob.getCubeSizeMapFromRowCount(cubeSegment, rowCountMap); System.out.println("cube size estimation:" + cubeSizeMap); final byte[][] splitKeys = CreateHTableJob.getSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, cubeSegment); CubeHTableUtil.createHTable(cubeSegment, splitKeys); http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index 4fac4fc..85c9200 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -18,26 +18,19 @@ package org.apache.kylin.storage.hbase.steps; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import javax.annotation.Nullable; - import org.apache.commons.cli.Options; import org.apache.commons.math3.primes.Primes; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -45,31 +38,22 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.CuboidShardUtil; -import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.engine.mr.steps.InMemCuboidJob; import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -110,7 +94,7 @@ public class CreateHTableJob extends AbstractHadoopJob { try { byte[][] splitKeys; if (statsEnabled) { - final Map<Long, Long> cuboidSizeMap = getCubeRowCountMapFromCuboidStatistics(cubeSegment, kylinConfig, conf); + final Map<Long, Double> cuboidSizeMap = InMemCuboidJob.getCubeSizeMapFromCuboidStatistics(cubeSegment, kylinConfig, conf); splitKeys = getSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment); } else { splitKeys = getSplits(conf, partitionFilePath); @@ -160,50 +144,6 @@ public class CreateHTableJob extends AbstractHadoopJob { return retValue.length == 0 ? null : retValue; } - public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig kylinConfig, Configuration conf) throws IOException { - ResourceStore rs = ResourceStore.getStore(kylinConfig); - String fileKey = cubeSegment.getStatisticsResourcePath(); - InputStream is = rs.getResource(fileKey).inputStream; - File tempFile = null; - FileOutputStream tempFileStream = null; - try { - tempFile = File.createTempFile(cubeSegment.getUuid(), ".seq"); - tempFileStream = new FileOutputStream(tempFile); - org.apache.commons.io.IOUtils.copy(is, tempFileStream); - } finally { - IOUtils.closeStream(is); - IOUtils.closeStream(tempFileStream); - } - Map<Long, HyperLogLogPlusCounter> counterMap = Maps.newHashMap(); - - FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath()); - int samplingPercentage = 25; - SequenceFile.Reader reader = null; - try { - reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf); - LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); - BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf); - while (reader.next(key, value)) { - if (key.get() == 0L) { - samplingPercentage = Bytes.toInt(value.getBytes()); - } else { - HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14); - ByteArray byteArray = new ByteArray(value.getBytes()); - hll.readRegisters(byteArray.asBuffer()); - counterMap.put(key.get(), hll); - } - - } - } catch (Exception e) { - e.printStackTrace(); - throw e; - } finally { - IOUtils.closeStream(reader); - tempFile.delete(); - } - return getCubeRowCountMapFromCuboidStatistics(counterMap, samplingPercentage); - } - //one region for one shard private static byte[][] getSplitsByRegionCount(int regionCount) { byte[][] result = new byte[regionCount - 1][]; @@ -215,51 +155,24 @@ public class CreateHTableJob extends AbstractHadoopJob { return result; } - public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(Map<Long, HyperLogLogPlusCounter> counterMap, final int samplingPercentage) throws IOException { - Preconditions.checkArgument(samplingPercentage > 0); - return Maps.transformValues(counterMap, new Function<HyperLogLogPlusCounter, Long>() { - @Nullable - @Override - public Long apply(HyperLogLogPlusCounter input) { - return input.getCountEstimate() * 100 / samplingPercentage; - } - }); - } - - public static byte[][] getSplitsFromCuboidStatistics(final Map<Long, Long> cubeRowCountMap, KylinConfig kylinConfig, CubeSegment cubeSegment) throws IOException { + public static byte[][] getSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap, KylinConfig kylinConfig, CubeSegment cubeSegment) throws IOException { final CubeDesc cubeDesc = cubeSegment.getCubeDesc(); - final List<Integer> rowkeyColumnSize = Lists.newArrayList(); - final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - List<TblColRef> columnList = baseCuboid.getColumns(); - - for (int i = 0; i < columnList.size(); i++) { - logger.info("Rowkey column " + i + " length " + cubeSegment.getColumnLength(columnList.get(i))); - rowkeyColumnSize.add(cubeSegment.getColumnLength(columnList.get(i))); - } - DataModelDesc.RealizationCapacity cubeCapacity = cubeDesc.getModel().getCapacity(); int cut = kylinConfig.getHBaseRegionCut(cubeCapacity.toString()); logger.info("Cube capacity " + cubeCapacity.toString() + ", chosen cut for HTable is " + cut + "GB"); double totalSizeInM = 0; - - List<Long> allCuboids = Lists.newArrayList(); - allCuboids.addAll(cubeRowCountMap.keySet()); - Collections.sort(allCuboids); - - Map<Long, Double> cubeSizeMap = Maps.newHashMap(); - for (Map.Entry<Long, Long> entry : cubeRowCountMap.entrySet()) { - cubeSizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), baseCuboidId, rowkeyColumnSize)); - } - for (Double cuboidSize : cubeSizeMap.values()) { totalSizeInM += cuboidSize; } + List<Long> allCuboids = Lists.newArrayList(); + allCuboids.addAll(cubeSizeMap.keySet()); + Collections.sort(allCuboids); + int nRegion = Math.round((float) (totalSizeInM / (cut * 1024L))); nRegion = Math.max(kylinConfig.getHBaseRegionCountMin(), nRegion); nRegion = Math.min(kylinConfig.getHBaseRegionCountMax(), nRegion); @@ -350,48 +263,6 @@ public class CreateHTableJob extends AbstractHadoopJob { } } - /** - * Estimate the cuboid's size - * - * @return the cuboid size in M bytes - */ - private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) { - - int bytesLength = cubeSegment.getRowKeyPreambleSize(); - - long mask = Long.highestOneBit(baseCuboidId); - long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId); - for (int i = 0; i < parentCuboidIdActualLength; i++) { - if ((mask & cuboidId) > 0) { - bytesLength += rowKeyColumnLength.get(i); //colIO.getColumnLength(columnList.get(i)); - } - mask = mask >> 1; - } - - // add the measure length - int space = 0; - boolean isMemoryHungry = false; - for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) { - if (measureDesc.getFunction().getMeasureType().isMemoryHungry()) { - isMemoryHungry = true; - } - DataType returnType = measureDesc.getFunction().getReturnDataType(); - space += returnType.getStorageBytesEstimate(); - } - bytesLength += space; - - double ret = 1.0 * bytesLength * rowCount / (1024L * 1024L); - if (isMemoryHungry) { - logger.info("Cube is memory hungry, storage size estimation multiply 0.05"); - ret *= 0.05; - } else { - logger.info("Cube is not memory hungry, storage size estimation multiply 0.25"); - ret *= 0.25; - } - logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + bytesLength + " bytes." + " Total size is " + ret + "M."); - return ret; - } - public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new CreateHTableJob(), args); System.exit(exitCode); http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java deleted file mode 100644 index 397f4fe..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.steps; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hbase.mapreduce.TableMapper; -import org.apache.hadoop.hbase.mapreduce.TableSplit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Reader; -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Mapper.Context; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.HBaseColumnDesc; -import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; -import org.apache.kylin.engine.mr.ByteArrayWritable; -import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.engine.mr.IMROutput2; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.metadata.model.MeasureDesc; - -import com.google.common.collect.Lists; - -public class HBaseMROutput2 implements IMROutput2 { - - @Override - public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) { - return new IMRBatchCubingOutputSide2() { - HBaseMRSteps steps = new HBaseMRSteps(seg); - - @Override - public IMRStorageOutputFormat getStorageOutputFormat() { - return new HBaseOutputFormat(seg); - } - - @Override - public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) { - jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId())); - } - - @Override - public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) { - jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); - } - - @Override - public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { - // nothing to do - } - }; - } - - @Override - public IMRBatchMergeInputSide2 getBatchMergeInputSide(final CubeSegment seg) { - return new IMRBatchMergeInputSide2() { - @Override - public IMRStorageInputFormat getStorageInputFormat() { - return new HBaseInputFormat(seg); - } - }; - } - - @Override - public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment seg) { - return new IMRBatchMergeOutputSide2() { - HBaseMRSteps steps = new HBaseMRSteps(seg); - - @Override - public IMRStorageOutputFormat getStorageOutputFormat() { - return new HBaseOutputFormat(seg); - } - - @Override - public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) { - jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId())); - } - - @Override - public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) { - jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); - } - - @Override - public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) { - jobFlow.addTask(steps.createMergeGCStep()); - } - }; - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private static class HBaseInputFormat implements IMRStorageInputFormat { - final CubeSegment seg; - - final RowValueDecoder[] rowValueDecoders; - final ByteArrayWritable parsedKey; - final Object[] parsedValue; - final Pair<ByteArrayWritable, Object[]> parsedPair; - - public HBaseInputFormat(CubeSegment seg) { - this.seg = seg; - - List<RowValueDecoder> valueDecoderList = Lists.newArrayList(); - List<MeasureDesc> measuresDescs = Lists.newArrayList(); - for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHbaseMapping().getColumnFamily()) { - for (HBaseColumnDesc colDesc : cfDesc.getColumns()) { - valueDecoderList.add(new RowValueDecoder(colDesc)); - for (MeasureDesc measure : colDesc.getMeasures()) { - measuresDescs.add(measure); - } - } - } - this.rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]); - - this.parsedKey = new ByteArrayWritable(); - this.parsedValue = new Object[measuresDescs.size()]; - this.parsedPair = Pair.newPair(parsedKey, parsedValue); - } - - @Override - public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException { - Configuration conf = job.getConfiguration(); - HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); - - List<Scan> scans = new ArrayList<Scan>(); - for (String htable : new HBaseMRSteps(seg).getMergingHTables()) { - Scan scan = new Scan(); - scan.setCaching(512); // 1 is the default in Scan, which will be bad for MapReduce jobs - scan.setCacheBlocks(false); // don't set to true for MR jobs - scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable)); - scans.add(scan); - } - - TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapperClz, outputKeyClz, outputValueClz, job); - TableMapReduceUtil.initCredentials(job); - - } - - @Override - public CubeSegment findSourceSegment(Context context) throws IOException { - TableSplit currentSplit = (TableSplit) context.getInputSplit(); - byte[] tableName = currentSplit.getTableName(); - String htableName = Bytes.toString(tableName); - - // decide which source segment - for (CubeSegment segment : seg.getCubeInstance().getSegments()) { - String segmentHtable = segment.getStorageLocationIdentifier(); - if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) { - return segment; - } - } - throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName); - } - - @Override - public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue) { - ImmutableBytesWritable key = (ImmutableBytesWritable) inKey; - parsedKey.set(key.get(), key.getOffset(), key.getLength()); - - Result hbaseRow = (Result) inValue; - for (int i = 0; i < rowValueDecoders.length; i++) { - rowValueDecoders[i].decode(hbaseRow); - rowValueDecoders[i].loadCubeMeasureArray(parsedValue); - } - - return parsedPair; - } - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private static class HBaseOutputFormat implements IMRStorageOutputFormat { - final CubeSegment seg; - - final List<KeyValueCreator> keyValueCreators = Lists.newArrayList(); - final ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); - - public HBaseOutputFormat(CubeSegment seg) { - this.seg = seg; - } - - @Override - public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException { - Path hfilePath = new Path(new HBaseMRSteps(seg).getHFilePath(jobFlowId)); - FileOutputFormat.setOutputPath(job, hfilePath); - - String htableName = seg.getStorageLocationIdentifier(); - Configuration conf = HBaseConfiguration.create(job.getConfiguration()); - HTable htable = new HTable(conf, htableName); - HFileOutputFormat.configureIncrementalLoad(job, htable); - - // set Reducer; This need be after configureIncrementalLoad, to overwrite the default reducer class - job.setReducerClass(reducer); - - // kylin uses ByteArrayWritable instead of ImmutableBytesWritable as mapper output key - rewriteTotalOrderPartitionerFile(job); - - HadoopUtil.deletePath(job.getConfiguration(), hfilePath); - } - - @Override - public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException { - if (keyValueCreators.size() == 0) { - for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHbaseMapping().getColumnFamily()) { - for (HBaseColumnDesc colDesc : cfDesc.getColumns()) { - keyValueCreators.add(new KeyValueCreator(seg.getCubeDesc(), colDesc)); - } - } - } - - outputKey.set(key.array(), key.offset(), key.length()); - - KeyValue outputValue; - for (int i = 0; i < keyValueCreators.size(); i++) { - outputValue = keyValueCreators.get(i).create(key.array(), key.offset(), key.length(), value); - context.write(outputKey, outputValue); - } - } - - private void rewriteTotalOrderPartitionerFile(Job job) throws IOException { - Configuration conf = job.getConfiguration(); - String partitionsFile = TotalOrderPartitioner.getPartitionFile(conf); - if (StringUtils.isBlank(partitionsFile)) - throw new IllegalStateException("HFileOutputFormat.configureIncrementalLoad don't configure TotalOrderPartitioner any more?"); - - Path partitionsPath = new Path(partitionsFile); - - // read in partition file in ImmutableBytesWritable - List<ByteArrayWritable> keys = Lists.newArrayList(); - Reader reader = new SequenceFile.Reader(conf, Reader.file(partitionsPath)); - try { - ImmutableBytesWritable key = new ImmutableBytesWritable(); - while (reader.next(key, NullWritable.get())) { - keys.add(new ByteArrayWritable(key.copyBytes())); - } - } finally { - reader.close(); - } - - // write out again in ByteArrayWritable - Writer writer = SequenceFile.createWriter(conf, Writer.file(partitionsPath), Writer.keyClass(ByteArrayWritable.class), Writer.valueClass(NullWritable.class)); - try { - for (ByteArrayWritable key : keys) { - writer.append(key, NullWritable.get()); - } - } finally { - writer.close(); - } - } - - } - -}