Repository: kylin Updated Branches: refs/heads/2.x-staging 2ef13d08e -> 3fc3883a4
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java index ac304f5..4c2737d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java @@ -18,53 +18,12 @@ package org.apache.kylin.storage.hbase.steps; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hbase.mapreduce.TableMapper; -import org.apache.hadoop.hbase.mapreduce.TableSplit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Mapper.Context; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.cube.model.HBaseColumnDesc; -import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; -import org.apache.kylin.engine.mr.ByteArrayWritable; -import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.IMROutput2; -import org.apache.kylin.engine.mr.common.AbstractHadoopJob; -import org.apache.kylin.engine.mr.steps.MergeCuboidMapper; import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.measure.MeasureCodec; -import org.apache.kylin.metadata.model.MeasureDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - /** * This "Transition" impl generates cuboid files and then convert to HFile. * The additional step slows down build process, but the gains is merge @@ -77,6 +36,7 @@ import com.google.common.collect.Lists; */ public class HBaseMROutput2Transition implements IMROutput2 { + @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(HBaseMROutput2Transition.class); @Override @@ -85,19 +45,12 @@ public class HBaseMROutput2Transition implements IMROutput2 { HBaseMRSteps steps = new HBaseMRSteps(seg); @Override - public IMRStorageOutputFormat getStorageOutputFormat() { - return new HBaseOutputFormat(seg); - } - - @Override public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) { jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId())); } @Override - public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) { - String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId()); - + public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath) { jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId())); jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); } @@ -110,34 +63,17 @@ public class HBaseMROutput2Transition implements IMROutput2 { } @Override - public IMRBatchMergeInputSide2 getBatchMergeInputSide(final CubeSegment seg) { - return new IMRBatchMergeInputSide2() { - @Override - public IMRStorageInputFormat getStorageInputFormat() { - return new HBaseInputFormat(seg); - } - }; - } - - @Override public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment seg) { return new IMRBatchMergeOutputSide2() { HBaseMRSteps steps = new HBaseMRSteps(seg); @Override - public IMRStorageOutputFormat getStorageOutputFormat() { - return new HBaseOutputFormat(seg); - } - - @Override public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) { jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId())); } @Override - public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) { - String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId()); - + public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath) { jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId())); jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); } @@ -148,263 +84,4 @@ public class HBaseMROutput2Transition implements IMROutput2 { } }; } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private static class HBaseInputFormat implements IMRStorageInputFormat { - final CubeSegment seg; - - public HBaseInputFormat(CubeSegment seg) { - this.seg = seg; - } - - @Override - public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException { - // merge by cuboid files - if (isMergeFromCuboidFiles(job.getConfiguration())) { - logger.info("Merge from cuboid files for " + seg); - - job.setInputFormatClass(SequenceFileInputFormat.class); - addCuboidInputDirs(job); - - job.setMapperClass(mapperClz); - job.setMapOutputKeyClass(outputKeyClz); - job.setMapOutputValueClass(outputValueClz); - } - // merge from HTable scan - else { - logger.info("Merge from HTables for " + seg); - - Configuration conf = job.getConfiguration(); - HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); - - List<Scan> scans = new ArrayList<Scan>(); - for (String htable : new HBaseMRSteps(seg).getMergingHTables()) { - Scan scan = new Scan(); - scan.setCaching(512); // 1 is the default in Scan, which will be bad for MapReduce jobs - scan.setCacheBlocks(false); // don't set to true for MR jobs - scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable)); - scans.add(scan); - } - TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapperClz, outputKeyClz, outputValueClz, job); - TableMapReduceUtil.initCredentials(job); - } - } - - private void addCuboidInputDirs(Job job) throws IOException { - List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg); - HBaseMRSteps steps = new HBaseMRSteps(seg); - - String[] inputs = new String[mergingSegments.size()]; - for (int i = 0; i < mergingSegments.size(); i++) { - CubeSegment mergingSeg = mergingSegments.get(i); - String cuboidPath = steps.getCuboidRootPath(mergingSeg); - inputs[i] = cuboidPath + (cuboidPath.endsWith("/") ? "" : "/") + "*"; - } - - AbstractHadoopJob.addInputDirs(inputs, job); - } - - @Override - public CubeSegment findSourceSegment(Context context) throws IOException { - // merge by cuboid files - if (isMergeFromCuboidFiles(context.getConfiguration())) { - FileSplit fileSplit = (FileSplit) context.getInputSplit(); - return MergeCuboidMapper.findSourceSegment(fileSplit, seg.getCubeInstance()); - } - // merge from HTable scan - else { - TableSplit currentSplit = (TableSplit) context.getInputSplit(); - byte[] tableName = currentSplit.getTableName(); - String htableName = Bytes.toString(tableName); - - // decide which source segment - for (CubeSegment segment : seg.getCubeInstance().getSegments()) { - String segmentHtable = segment.getStorageLocationIdentifier(); - if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) { - return segment; - } - } - throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName); - } - } - - transient ByteArrayWritable parsedKey; - transient Object[] parsedValue; - transient Pair<ByteArrayWritable, Object[]> parsedPair; - - transient MeasureCodec measureCodec; - transient RowValueDecoder[] rowValueDecoders; - - @Override - public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue) { - // lazy init - if (parsedPair == null) { - parsedKey = new ByteArrayWritable(); - parsedValue = new Object[seg.getCubeDesc().getMeasures().size()]; - parsedPair = Pair.newPair(parsedKey, parsedValue); - } - - // merge by cuboid files - if (isMergeFromCuboidFiles(null)) { - return parseMapperInputFromCuboidFile(inKey, inValue); - } - // merge from HTable scan - else { - return parseMapperInputFromHTable(inKey, inValue); - } - } - - private Pair<ByteArrayWritable, Object[]> parseMapperInputFromCuboidFile(Object inKey, Object inValue) { - // lazy init - if (measureCodec == null) { - measureCodec = new MeasureCodec(seg.getCubeDesc().getMeasures()); - } - - Text key = (Text) inKey; - parsedKey.set(key.getBytes(), 0, key.getLength()); - - Text value = (Text) inValue; - measureCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), parsedValue); - - return parsedPair; - } - - private Pair<ByteArrayWritable, Object[]> parseMapperInputFromHTable(Object inKey, Object inValue) { - // lazy init - if (rowValueDecoders == null) { - List<RowValueDecoder> valueDecoderList = Lists.newArrayList(); - List<MeasureDesc> measuresDescs = Lists.newArrayList(); - for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHbaseMapping().getColumnFamily()) { - for (HBaseColumnDesc colDesc : cfDesc.getColumns()) { - valueDecoderList.add(new RowValueDecoder(colDesc)); - for (MeasureDesc measure : colDesc.getMeasures()) { - measuresDescs.add(measure); - } - } - } - rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]); - } - - ImmutableBytesWritable key = (ImmutableBytesWritable) inKey; - parsedKey.set(key.get(), key.getOffset(), key.getLength()); - - Result hbaseRow = (Result) inValue; - for (int i = 0; i < rowValueDecoders.length; i++) { - rowValueDecoders[i].decode(hbaseRow); - rowValueDecoders[i].loadCubeMeasureArray(parsedValue); - } - - return parsedPair; - } - - transient Boolean isMergeFromCuboidFiles; - - // merge from cuboid files is better than merge from HTable, because no workload on HBase region server - private boolean isMergeFromCuboidFiles(Configuration jobConf) { - // cache in this object? - if (isMergeFromCuboidFiles != null) - return isMergeFromCuboidFiles.booleanValue(); - - final String confKey = "kylin.isMergeFromCuboidFiles"; - - // cache in job configuration? - if (jobConf != null) { - String result = jobConf.get(confKey); - if (result != null) { - isMergeFromCuboidFiles = Boolean.valueOf(result); - return isMergeFromCuboidFiles.booleanValue(); - } - } - - boolean result = true; - - try { - List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg); - HBaseMRSteps steps = new HBaseMRSteps(seg); - for (CubeSegment mergingSeg : mergingSegments) { - String cuboidRootPath = steps.getCuboidRootPath(mergingSeg); - FileSystem fs = HadoopUtil.getFileSystem(cuboidRootPath); - - boolean cuboidFileExist = fs.exists(new Path(cuboidRootPath)); - if (cuboidFileExist == false) { - logger.info("Merge from HTable because " + cuboidRootPath + " does not exist"); - result = false; - break; - } - } - } catch (IOException e) { - throw new RuntimeException(e); - } - - // put in cache - isMergeFromCuboidFiles = Boolean.valueOf(result); - if (jobConf != null) { - jobConf.set(confKey, "" + result); - } - return result; - } - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private static class HBaseOutputFormat implements IMRStorageOutputFormat { - final CubeSegment seg; - - Text outputKey; - Text outputValue; - ByteBuffer valueBuf; - MeasureCodec codec; - - public HBaseOutputFormat(CubeSegment seg) { - this.seg = seg; - } - - @Override - public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException { - job.setReducerClass(reducer); - - // estimate the reducer number - String htableName = seg.getStorageLocationIdentifier(); - Configuration conf = HBaseConfiguration.create(job.getConfiguration()); - HTable htable = new HTable(conf, htableName); - int regions = htable.getStartKeys().length + 1; - htable.close(); - - int reducerNum = regions * 3; - reducerNum = Math.max(1, reducerNum); - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - reducerNum = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), reducerNum); - - job.setNumReduceTasks(reducerNum); - - // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - - Path output = new Path(new HBaseMRSteps(seg).getCuboidRootPath(jobFlowId)); - FileOutputFormat.setOutputPath(job, output); - - HadoopUtil.deletePath(job.getConfiguration(), output); - } - - @Override - public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException { - // lazy init - if (outputKey == null) { - outputKey = new Text(); - outputValue = new Text(); - valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - codec = new MeasureCodec(seg.getCubeDesc().getMeasures()); - } - - outputKey.set(key.array(), key.offset(), key.length()); - - valueBuf.clear(); - codec.encode(value, valueBuf); - outputValue.set(valueBuf.array(), 0, valueBuf.position()); - - context.write(outputKey, outputValue); - } - } - -} +} \ No newline at end of file