http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java new file mode 100644 index 0000000..0379f64 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.job.exception.JobException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MapReduceUtil { + + private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class); + + /** + * @param cuboidScheduler specified can provide more flexibility + * */ + public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, + double totalMapInputMB, int level) + throws ClassNotFoundException, IOException, InterruptedException, JobException { + CubeDesc cubeDesc = cubeSegment.getCubeDesc(); + KylinConfig kylinConfig = cubeDesc.getConfig(); + + double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); + double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); + logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + + level); + + CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig); + + double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst; + + if (level == -1) { + //merge case + double estimatedSize = cubeStatsReader.estimateCubeSize(); + adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize; + logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize, + totalMapInputMB, adjustedCurrentLayerSizeEst); + } else if (level == 0) { + //base cuboid case TODO: the estimation could be very WRONG because it has no correction + adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0); + logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst); + } else { + parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1); + currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level); + adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst; + logger.debug( + "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", + totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst); + } + + // number of reduce tasks + int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99); + + // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance + if (cubeDesc.hasMemoryHungryMeasures()) { + logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures"); + numReduceTasks = numReduceTasks * 4; + } + + // at least 1 reducer by default + numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks); + // no more than 500 reducer by default + numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); + + return numReduceTasks; + } + + public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler) + throws IOException { + KylinConfig kylinConfig = cubeSeg.getConfig(); + + Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap(); + double totalSizeInM = 0; + for (Double cuboidSize : cubeSizeMap.values()) { + totalSizeInM += cuboidSize; + } + + double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); + double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); + + // number of reduce tasks + int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio); + + // at least 1 reducer by default + numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks); + // no more than 500 reducer by default + numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); + + logger.info("Having total map input MB " + Math.round(totalSizeInM)); + logger.info("Having per reduce MB " + perReduceInputMB); + logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks); + return numReduceTasks; + } +}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java index cfac0e7..3b68bbf 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java @@ -38,7 +38,7 @@ public class StatisticsDecisionUtil { protected static final Logger logger = LoggerFactory.getLogger(StatisticsDecisionUtil.class); public static void decideCubingAlgorithm(CubingJob cubingJob, CubeSegment seg) throws IOException { - CubeStatsReader cubeStats = new CubeStatsReader(seg, seg.getConfig()); + CubeStatsReader cubeStats = new CubeStatsReader(seg, null, seg.getConfig()); decideCubingAlgorithm(cubingJob, seg, cubeStats.getMapperOverlapRatioOfFirstBuild(), cubeStats.getMapperNumberOfFirstBuild()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index 93e413b..229bd85 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -60,7 +60,7 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<K @Override protected void setup(Context context) throws IOException { super.bindCurrentConfiguration(context.getConfiguration()); - cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); + cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME); segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); final KylinConfig kylinConfig = AbstractHadoopJob.loadKylinPropsAndMetadata(); cube = CubeManager.getInstance(kylinConfig).getCube(cubeName); http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java new file mode 100644 index 0000000..b60076c --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob { + + private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidJob.class); + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT); + options.addOption(OPTION_CUBOID_MODE); + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String cubeName = getOptionValue(OPTION_CUBE_NAME); + String segmentID = getOptionValue(OPTION_SEGMENT_ID); + Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT); + String cuboidMode = getOptionValue(OPTION_CUBOID_MODE); + + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + CubeSegment cubeSegment = cube.getSegmentById(segmentID); + + job.getConfiguration().set(BatchConstants.CFG_CUBOID_MODE, cuboidMode); + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); + job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent); + logger.info("Starting: " + job.getJobName()); + + setJobClasspath(job, cube.getConfig()); + + setupMapper(input); + setupReducer(output, 1); + + attachSegmentMetadataWithDict(cubeSegment, job.getConfiguration()); + + return waitForCompletion(job); + + } catch (Exception e) { + logger.error("error in CalculateStatsFromBaseCuboidJob", e); + printUsage(options); + throw e; + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + } + + private void setupMapper(Path input) throws IOException { + FileInputFormat.setInputPaths(job, input); + job.setMapperClass(CalculateStatsFromBaseCuboidMapper.class); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + } + + private void setupReducer(Path output, int numberOfReducers) throws IOException { + job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks(numberOfReducers); + + FileOutputFormat.setOutputPath(job, output); + job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); + + deletePath(job.getConfiguration(), output); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapper.java new file mode 100644 index 0000000..fbc7c7c --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapper.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidUtil; +import org.apache.kylin.cube.kv.RowKeyDecoder; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.hllc.HLLCounter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; + +public class CalculateStatsFromBaseCuboidMapper extends KylinMapper<Text, Text, Text, Text> { + private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidMapper.class); + + protected int nRowKey; + protected long baseCuboidId; + + private int samplingPercentage; + private int rowCount = 0; + + private HLLCounter[] allCuboidsHLL = null; + private Long[] cuboidIds; + private Integer[][] allCuboidsBitSet = null; + private ByteArray[] row_hashcodes = null; + private HashFunction hf = null; + + RowKeyDecoder rowKeyDecoder; + + protected Text outputKey = new Text(); + protected Text outputValue = new Text(); + + @Override + protected void setup(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + HadoopUtil.setCurrentConfiguration(conf); + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + + String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); + CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); + CubeDesc cubeDesc = cube.getDescriptor(); + CubeSegment cubeSegment = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID)); + + baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId(); + nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; + + String cuboidModeName = conf.get(BatchConstants.CFG_CUBOID_MODE); + Set<Long> cuboidIdSet = cube.getCuboidsByMode(cuboidModeName); + + cuboidIds = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]); + allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey); + + samplingPercentage = Integer + .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); + + allCuboidsHLL = new HLLCounter[cuboidIds.length]; + for (int i = 0; i < cuboidIds.length; i++) { + allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()); + } + + hf = Hashing.murmur3_32(); + row_hashcodes = new ByteArray[nRowKey]; + for (int i = 0; i < nRowKey; i++) { + row_hashcodes[i] = new ByteArray(); + } + + rowKeyDecoder = new RowKeyDecoder(cubeSegment); + } + + @Override + public void doMap(Text key, Text value, Context context) throws InterruptedException, IOException { + long cuboidID = rowKeyDecoder.decode(key.getBytes()); + if (cuboidID != baseCuboidId) { + return; // Skip data from cuboids which are not the base cuboid + } + + List<String> keyValues = rowKeyDecoder.getValues(); + + if (rowCount < samplingPercentage) { + Preconditions.checkArgument(nRowKey == keyValues.size()); + + String[] row = keyValues.toArray(new String[keyValues.size()]); + + putRowKeyToHLL(row); + } + + if (++rowCount == 100) + rowCount = 0; + } + + public void putRowKeyToHLL(String[] row) { + //generate hash for each row key column + for (int i = 0; i < nRowKey; i++) { + Hasher hc = hf.newHasher(); + String colValue = row[i]; + if (colValue != null) { + row_hashcodes[i].set(hc.putString(colValue).hash().asBytes()); + } else { + row_hashcodes[i].set(hc.putInt(0).hash().asBytes()); + } + } + + // use the row key column hash to get a consolidated hash for each cuboid + for (int i = 0; i < cuboidIds.length; i++) { + Hasher hc = hf.newHasher(); + for (int position = 0; position < allCuboidsBitSet[i].length; position++) { + hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array()); + } + + allCuboidsHLL[i].add(hc.hash().asBytes()); + } + } + + @Override + protected void doCleanup(Context context) throws IOException, InterruptedException { + ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); + HLLCounter hll; + for (int i = 0; i < cuboidIds.length; i++) { + hll = allCuboidsHLL[i]; + + outputKey.set(Bytes.toBytes(cuboidIds[i])); + hllBuf.clear(); + hll.writeRegisters(hllBuf); + outputValue.set(hllBuf.array(), 0, hllBuf.position()); + context.write(outputKey, outputValue); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java new file mode 100644 index 0000000..756c233 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.KylinReducer; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsWriter; +import org.apache.kylin.measure.hllc.HLLCounter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class CalculateStatsFromBaseCuboidReducer extends KylinReducer<Text, Text, NullWritable, Text> { + + private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidReducer.class); + + private KylinConfig cubeConfig; + protected long baseCuboidId; + protected Map<Long, HLLCounter> cuboidHLLMap = null; + private List<Long> baseCuboidRowCountInMappers; + private long totalRowsBeforeMerge = 0; + + private String output = null; + private int samplingPercentage; + + @Override + protected void setup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + + Configuration conf = context.getConfiguration(); + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); + CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); + cubeConfig = cube.getConfig(); + + baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId(); + baseCuboidRowCountInMappers = Lists.newLinkedList(); + + output = conf.get(BatchConstants.CFG_OUTPUT_PATH); + samplingPercentage = Integer + .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); + + cuboidHLLMap = Maps.newHashMap(); + } + + @Override + public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { + long cuboidId = Bytes.toLong(key.getBytes()); + logger.info("Cuboid id to be processed: " + cuboidId); + for (Text value : values) { + HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision()); + ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength()); + hll.readRegisters(bf); + + if (cuboidId == baseCuboidId) { + baseCuboidRowCountInMappers.add(hll.getCountEstimate()); + } + + totalRowsBeforeMerge += hll.getCountEstimate(); + + if (cuboidHLLMap.get(cuboidId) != null) { + cuboidHLLMap.get(cuboidId).merge(hll); + } else { + cuboidHLLMap.put(cuboidId, hll); + } + } + } + + @Override + protected void doCleanup(Context context) throws IOException, InterruptedException { + long grandTotal = 0; + for (HLLCounter hll : cuboidHLLMap.values()) { + grandTotal += hll.getCountEstimate(); + } + double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; + + CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(output), // + cuboidHLLMap, samplingPercentage, baseCuboidRowCountInMappers.size(), mapperOverlapRatio); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java new file mode 100644 index 0000000..78377ae --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; + +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class CopyDictionaryStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class); + + public CopyDictionaryStep() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeManager mgr = CubeManager.getInstance(context.getConfig()); + final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + + CubeSegment oldSegment = optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment); + Preconditions.checkNotNull(oldSegment, + "cannot find the original segment to be optimized by " + optimizeSegment); + + // --- Copy dictionary + optimizeSegment.getDictionaries().putAll(oldSegment.getDictionaries()); + optimizeSegment.getSnapshots().putAll(oldSegment.getSnapshots()); + + try { + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToUpdateSegs(optimizeSegment); + mgr.updateCube(cubeBuilder); + } catch (IOException e) { + logger.error("fail to merge dictionary or lookup snapshots", e); + return new ExecuteResult(e, e.getLocalizedMessage()); + } + + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java index 65c5869..e06077a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java @@ -38,6 +38,7 @@ import javax.annotation.Nullable; public class CubingExecutableUtil { public static final String CUBE_NAME = "cubeName"; + public static final String SEGMENT_NAME = "segmentName"; public static final String SEGMENT_ID = "segmentId"; public static final String MERGING_SEGMENT_IDS = "mergingSegmentIds"; public static final String STATISTICS_PATH = "statisticsPath"; @@ -61,6 +62,14 @@ public class CubingExecutableUtil { return params.get(CUBE_NAME); } + public static void setSegmentName(String segmentName, Map<String, String> params) { + params.put(SEGMENT_NAME, segmentName); + } + + public static String getSegmentName(Map<String, String> params) { + return params.get(SEGMENT_NAME); + } + public static void setSegmentId(String segmentId, Map<String, String> params) { params.put(SEGMENT_ID, segmentId); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index 6a8ba4c..b49b639 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -28,12 +28,15 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; +import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; import org.apache.kylin.engine.mr.IMROutput2; import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil; import org.apache.kylin.job.execution.ExecutableManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,18 +53,27 @@ public class CuboidJob extends AbstractHadoopJob { private boolean skipped = false; + private CuboidScheduler cuboidScheduler; + @Override public boolean isSkipped() { return skipped; } - private boolean checkSkip(String cubingJobId) { + private boolean checkSkip(String cubingJobId, int level) { if (cubingJobId == null) return false; ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); CubingJob cubingJob = (CubingJob) execMgr.getJob(cubingJobId); skipped = cubingJob.isLayerCubing() == false; + if (!skipped) { + skipped = (level > cuboidScheduler.getBuildLevel()); + if (skipped) { + logger.info("Job level: " + level + " for " + cubingJobId + "[" + cubingJobId + + "] exceeds real cuboid tree levels : " + cuboidScheduler.getBuildLevel()); + } + } return skipped; } @@ -80,6 +92,7 @@ public class CuboidJob extends AbstractHadoopJob { options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_NCUBOID_LEVEL); options.addOption(OPTION_CUBING_JOB_ID); + options.addOption(OPTION_CUBOID_MODE); parseOptions(options, args); String output = getOptionValue(OPTION_OUTPUT_PATH); @@ -87,12 +100,18 @@ public class CuboidJob extends AbstractHadoopJob { int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL)); String segmentID = getOptionValue(OPTION_SEGMENT_ID); String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID); + String cuboidModeName = getOptionValue(OPTION_CUBOID_MODE); + if (cuboidModeName == null) { + cuboidModeName = CuboidModeEnum.CURRENT.toString(); + } CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); CubeSegment segment = cube.getSegmentById(segmentID); - if (checkSkip(cubingJobId)) { + cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(segment, cuboidModeName); + + if (checkSkip(cubingJobId, nCuboidLevel)) { logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segmentID + "[" + segmentID + "]"); return 0; } @@ -104,7 +123,7 @@ public class CuboidJob extends AbstractHadoopJob { setJobClasspath(job, cube.getConfig()); // add metadata to distributed cache - attachSegmentMetadataWithDict(segment, job.getConfiguration()); + attachSegmentMetadataWithAll(segment, job.getConfiguration()); // Mapper job.setMapperClass(this.mapperClass); @@ -122,12 +141,13 @@ public class CuboidJob extends AbstractHadoopJob { // set output IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(segment).getOuputFormat(); - outputFormat.configureJobOutput(job, output, segment, nCuboidLevel); + outputFormat.configureJobOutput(job, output, segment, cuboidScheduler, nCuboidLevel); // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, nCuboidLevel); + job.getConfiguration().set(BatchConstants.CFG_CUBOID_MODE, cuboidModeName); return waitForCompletion(job); } finally { http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java index 4e4c332..b5dc853 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -21,14 +21,14 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; -import java.util.List; +import java.util.Set; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinVersion; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.StringUtil; -import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.cuboid.CuboidUtil; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.hllc.HLLCounter; @@ -38,7 +38,6 @@ import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; @@ -57,7 +56,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB protected boolean collectStatistics = false; - protected CuboidScheduler cuboidScheduler = null; protected int nRowKey; private Integer[][] allCuboidsBitSet = null; private HLLCounter[] allCuboidsHLL = null; @@ -88,15 +86,11 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED)); if (collectStatistics) { samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); - cuboidScheduler = cubeSeg.getCuboidScheduler(); nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; - List<Long> cuboidIdList = Lists.newArrayList(); - List<Integer[]> allCuboidsBitSetList = Lists.newArrayList(); - addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList); - - allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]); - cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]); + Set<Long> cuboidIdSet = cubeSeg.getCuboidScheduler().getAllCuboidIds(); + cuboidIds = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]); + allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey); allCuboidsHLL = new HLLCounter[cuboidIds.length]; for (int i = 0; i < cuboidIds.length; i++) { @@ -135,27 +129,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB } - private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) { - allCuboids.add(cuboidId); - Integer[] indice = new Integer[Long.bitCount(cuboidId)]; - - long mask = Long.highestOneBit(baseCuboidId); - int position = 0; - for (int i = 0; i < nRowKey; i++) { - if ((mask & cuboidId) > 0) { - indice[position] = i; - position++; - } - mask = mask >> 1; - } - - allCuboidsBitSet.add(indice); - Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId); - for (Long childId : children) { - addCuboidBitSet(childId, allCuboidsBitSet, allCuboids); - } - } - @Override public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record); http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 7f01c3a..f018afe 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import com.google.common.base.Preconditions; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; @@ -52,6 +51,7 @@ import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -110,6 +110,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK if (collectStatistics && (taskId == numberOfTasks - 1)) { // hll isStatistics = true; + baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId(); baseCuboidRowCountInMappers = Lists.newArrayList(); cuboidHLLMap = Maps.newHashMap(); samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java new file mode 100644 index 0000000..97f9dc1 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FilterRecommendCuboidDataJob extends AbstractHadoopJob { + + private static final Logger logger = LoggerFactory.getLogger(FilterRecommendCuboidDataJob.class); + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_OUTPUT_PATH); + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); + String segmentID = getOptionValue(OPTION_SEGMENT_ID); + Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + CubeSegment optSegment = cube.getSegmentById(segmentID); + CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment); + + logger.info("Starting: " + job.getJobName()); + + setJobClasspath(job, cube.getConfig()); + + // Mapper + job.setMapperClass(FilterRecommendCuboidDataMapper.class); + + // Reducer + job.setNumReduceTasks(0); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + // Input + job.setInputFormatClass(SequenceFileInputFormat.class); + FileInputFormat.setInputPaths(job, input); + // Output + job.setOutputFormatClass(SequenceFileOutputFormat.class); + FileOutputFormat.setOutputPath(job, output); + + // set job configuration + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); + // add metadata to distributed cache + attachSegmentMetadataWithDict(originalSegment, job.getConfiguration()); + + this.deletePath(job.getConfiguration(), output); + + return waitForCompletion(job); + } catch (Exception e) { + logger.error("error in CuboidJob", e); + printUsage(options); + throw e; + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java new file mode 100644 index 0000000..4aac9eb --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase; +import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld; + +import java.io.IOException; +import java.util.Set; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.common.RowKeySplitter; +import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; + +import com.google.common.base.Preconditions; + +public class FilterRecommendCuboidDataMapper extends KylinMapper<Text, Text, Text, Text> { + + private MultipleOutputs mos; + + private RowKeySplitter rowKeySplitter; + private long baseCuboid; + private Set<Long> recommendCuboids; + + @Override + protected void setup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + mos = new MultipleOutputs(context); + + String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME); + String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); + + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance cube = cubeManager.getCube(cubeName); + CubeSegment optSegment = cube.getSegmentById(segmentID); + CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment); + + rowKeySplitter = new RowKeySplitter(originalSegment, 65, 255); + baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); + + recommendCuboids = cube.getCuboidsRecommend(); + Preconditions.checkNotNull(recommendCuboids, "The recommend cuboid map could not be null"); + } + + @Override + public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { + long cuboidID = rowKeySplitter.split(key.getBytes()); + if (cuboidID != baseCuboid && !recommendCuboids.contains(cuboidID)) { + return; + } + + String baseOutputPath = PathNameCuboidOld; + if (cuboidID == baseCuboid) { + baseOutputPath = PathNameCuboidBase; + } + mos.write(key, value, generateFileName(baseOutputPath)); + } + + @Override + public void doCleanup(Context context) throws IOException, InterruptedException { + mos.close(); + + Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), PathNameCuboidBase); + FileSystem fs = FileSystem.get(context.getConfiguration()); + if (!fs.exists(outputDirBase)) { + fs.mkdirs(outputDirBase); + SequenceFile + .createWriter(context.getConfiguration(), + SequenceFile.Writer.file(new Path(outputDirBase, "part-m-00000")), + SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)) + .close(); + } + } + + private String generateFileName(String subDir) { + return subDir + "/part"; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java new file mode 100644 index 0000000..62109f4 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.engine.mr.ByteArrayWritable; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil; +import org.apache.kylin.engine.mr.common.MapReduceUtil; +import org.apache.kylin.job.execution.ExecutableManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InMemCuboidFromBaseCuboidJob extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(InMemCuboidFromBaseCuboidJob.class); + + private boolean skipped = false; + + @Override + public boolean isSkipped() { + return skipped; + } + + private boolean checkSkip(String cubingJobId) { + if (cubingJobId == null) + return false; + + ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubingJob cubingJob = (CubingJob) execMgr.getJob(cubingJobId); + skipped = !cubingJob.isInMemCubing(); + return skipped; + } + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_CUBING_JOB_ID); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_CUBOID_MODE); + parseOptions(options, args); + + String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); + String segmentID = getOptionValue(OPTION_SEGMENT_ID); + String output = getOptionValue(OPTION_OUTPUT_PATH); + + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + CubeSegment cubeSeg = cube.getSegmentById(segmentID); + String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID); + + String cuboidModeName = getOptionValue(OPTION_CUBOID_MODE); + if (cuboidModeName == null) { + cuboidModeName = CuboidModeEnum.CURRENT.toString(); + } + + CuboidScheduler cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(cubeSeg, cuboidModeName); + + if (checkSkip(cubingJobId)) { + logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + cubeSeg); + return 0; + } + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + logger.info("Starting: " + job.getJobName()); + + setJobClasspath(job, cube.getConfig()); + + // add metadata to distributed cache + attachSegmentMetadataWithAll(cubeSeg, job.getConfiguration()); + + // set job configuration + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); + job.getConfiguration().set(BatchConstants.CFG_CUBOID_MODE, cuboidModeName); + + String input = getOptionValue(OPTION_INPUT_PATH); + FileInputFormat.setInputPaths(job, new Path(input)); + job.setInputFormatClass(SequenceFileInputFormat.class); + + // set mapper + job.setMapperClass(InMemCuboidFromBaseCuboidMapper.class); + job.setMapOutputKeyClass(ByteArrayWritable.class); + job.setMapOutputValueClass(ByteArrayWritable.class); + + // set output + job.setReducerClass(InMemCuboidFromBaseCuboidReducer.class); + job.setNumReduceTasks(MapReduceUtil.getInmemCubingReduceTaskNum(cubeSeg, cuboidScheduler)); + + // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + Path outputPath = new Path(output); + FileOutputFormat.setOutputPath(job, outputPath); + + HadoopUtil.deletePath(job.getConfiguration(), outputPath); + + return waitForCompletion(job); + } catch (Exception e) { + logger.error("error in CuboidJob", e); + printUsage(options); + throw e; + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + } + + public static void main(String[] args) throws Exception { + InMemCuboidFromBaseCuboidJob job = new InMemCuboidFromBaseCuboidJob(); + int exitCode = ToolRunner.run(job, args); + System.exit(exitCode); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java new file mode 100644 index 0000000..05cee58 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.gridtable.CubeGridTable; +import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder; +import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder; +import org.apache.kylin.cube.inmemcubing.InputConverterUnit; +import org.apache.kylin.cube.inmemcubing.InputConverterUnitForBaseCuboid; +import org.apache.kylin.cube.kv.CubeDimEncMap; +import org.apache.kylin.engine.mr.ByteArrayWritable; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.metadata.model.TblColRef; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public class InMemCuboidFromBaseCuboidMapper + extends InMemCuboidMapperBase<Text, Text, ByteArrayWritable, ByteArrayWritable, ByteArray> { + private static final Log logger = LogFactory.getLog(InMemCuboidFromBaseCuboidMapper.class); + + private ByteBuffer keyValueBuffer; + private int keyOffset; + + @Override + protected void setup(Mapper.Context context) throws IOException { + super.setup(context); + + long baseCuboid = Cuboid.getBaseCuboidId(cubeDesc); + GTInfo gtInfo = CubeGridTable.newGTInfo(Cuboid.findForMandatory(cubeDesc, baseCuboid), + new CubeDimEncMap(cubeDesc, dictionaryMap)); + keyValueBuffer = ByteBuffer.allocate(gtInfo.getMaxRecordLength()); + keyOffset = cubeSegment.getRowKeyPreambleSize(); + } + + @Override + protected InputConverterUnit<ByteArray> getInputConverterUnit() { + return new InputConverterUnitForBaseCuboid(); + } + + @Override + protected Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap, + int reserveMemoryMB, CuboidScheduler cuboidScheduler, InputConverterUnit<ByteArray> inputConverterUnit) { + AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap); + cubeBuilder.setReserveMemoryMB(reserveMemoryMB); + + ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("inmemory-cube-building-from-base-cuboid-mapper-%d").build()); + return executorService.submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit, + new MapContextGTRecordWriter(context, cubeDesc, cubeSegment))); + } + + @Override + protected ByteArray getRecordFromKeyValue(Text key, Text value) { + keyValueBuffer.clear(); + keyValueBuffer.put(key.getBytes(), keyOffset, key.getBytes().length - keyOffset); + keyValueBuffer.put(value.getBytes()); + + byte[] keyValue = new byte[keyValueBuffer.position()]; + System.arraycopy(keyValueBuffer.array(), 0, keyValue, 0, keyValueBuffer.position()); + + return new ByteArray(keyValue); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidReducer.java new file mode 100644 index 0000000..fbc45d9 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidReducer.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +public class InMemCuboidFromBaseCuboidReducer extends InMemCuboidReducer { + //pass +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index 73a2eb9..b0ea7b7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -93,7 +93,7 @@ public class InMemCuboidJob extends AbstractHadoopJob { setJobClasspath(job, cube.getConfig()); // add metadata to distributed cache - attachSegmentMetadataWithDict(segment, job.getConfiguration()); + attachSegmentMetadataWithAll(segment, job.getConfiguration()); // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); @@ -116,7 +116,7 @@ public class InMemCuboidJob extends AbstractHadoopJob { // set output IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(segment).getOuputFormat(); - outputFormat.configureJobOutput(job, output, segment, 0); + outputFormat.configureJobOutput(job, output, segment, segment.getCuboidScheduler(), 0); return waitForCompletion(job); } finally { http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java index 0642552..3dc95fa 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java @@ -33,8 +33,6 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.CuboidScheduler; -import org.apache.kylin.cube.cuboid.DefaultCuboidScheduler; -import org.apache.kylin.cube.cuboid.TreeCuboidSchedulerManager; import org.apache.kylin.cube.inmemcubing.ConsumeBlockingQueueController; import org.apache.kylin.cube.inmemcubing.InputConverterUnit; import org.apache.kylin.cube.model.CubeDesc; @@ -43,7 +41,7 @@ import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.common.CuboidStatsReaderUtil; +import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; @@ -113,11 +111,7 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T> } String cuboidModeName = conf.get(BatchConstants.CFG_CUBOID_MODE); - CuboidScheduler cuboidScheduler = TreeCuboidSchedulerManager.getTreeCuboidScheduler(cubeDesc, // - CuboidStatsReaderUtil.readCuboidStatsFromSegment(cube.getCuboidsByMode(cuboidModeName), cubeSegment)); - if (cuboidScheduler == null) { - cuboidScheduler = new DefaultCuboidScheduler(cubeDesc); - } + CuboidScheduler cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(cubeSegment, cuboidModeName); reserveMemoryMB = calculateReserveMB(conf); inputConverterUnit = getInputConverterUnit(); http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java index 2058bc9..60d0870 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java @@ -95,7 +95,7 @@ public abstract class KVGTRecordWriter implements ICuboidWriter { protected abstract void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException; private void initVariables(Long cuboidId) { - rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeSegment, cuboidId)); + rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findForMandatory(cubeDesc, cuboidId)); keyBuf = rowKeyEncoder.createBuf(); dimensions = Long.bitCount(cuboidId); http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index c80283e..ef3adad 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -94,7 +94,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { protected void setup(Context context) throws IOException, InterruptedException { super.bindCurrentConfiguration(context.getConfiguration()); - cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); + cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME); segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); config = AbstractHadoopJob.loadKylinPropsAndMetadata(); http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java new file mode 100644 index 0000000..cf6e249 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsReader; +import org.apache.kylin.engine.mr.common.CubeStatsWriter; +import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.measure.hllc.HLLCounter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +public class MergeStatisticsWithOldStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(MergeStatisticsWithOldStep.class); + + protected Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap(); + + public MergeStatisticsWithOldStep() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeManager mgr = CubeManager.getInstance(context.getConfig()); + final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + final String statsInputPath = CubingExecutableUtil.getStatisticsPath(this.getParams()); + + CubeSegment oldSegment = optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment); + Preconditions.checkNotNull(oldSegment, + "cannot find the original segment to be optimized by " + optimizeSegment); + + KylinConfig kylinConf = cube.getConfig(); + Configuration conf = HadoopUtil.getCurrentConfiguration(); + ResourceStore rs = ResourceStore.getStore(kylinConf); + int averageSamplingPercentage = 0; + + try { + //1. Add statistics from optimized segment + Path statisticsFilePath = new Path(statsInputPath, + BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); + FileSystem hdfs = FileSystem.get(conf); + if (!hdfs.exists(statisticsFilePath)) + throw new IOException("File " + statisticsFilePath + " does not exists"); + + CubeStatsReader optimizeSegmentStatsReader = new CubeStatsReader(optimizeSegment, null, + optimizeSegment.getConfig(), statisticsFilePath); + averageSamplingPercentage += optimizeSegmentStatsReader.getSamplingPercentage(); + addFromCubeStatsReader(optimizeSegmentStatsReader); + + //2. Add statistics from old segment + CubeStatsReader oldSegmentStatsReader = new CubeStatsReader(oldSegment, null, oldSegment.getConfig()); + averageSamplingPercentage += oldSegmentStatsReader.getSamplingPercentage(); + addFromCubeStatsReader(oldSegmentStatsReader); + + logger.info("Cuboid set with stats info: " + cuboidHLLMap.keySet().toString()); + //3. Store merged statistics for recommend cuboids + averageSamplingPercentage = averageSamplingPercentage / 2; + Set<Long> cuboidsRecommend = cube.getCuboidsRecommend(); + + Map<Long, HLLCounter> resultCuboidHLLMap = Maps.newHashMapWithExpectedSize(cuboidsRecommend.size()); + for (Long cuboid : cuboidsRecommend) { + HLLCounter hll = cuboidHLLMap.get(cuboid); + if (hll == null) { + logger.warn("Cannot get the row count stats for cuboid " + cuboid); + } else { + resultCuboidHLLMap.put(cuboid, hll); + } + } + + String resultDir = CubingExecutableUtil.getMergedStatisticsPath(this.getParams()); + CubeStatsWriter.writeCuboidStatistics(conf, new Path(resultDir), resultCuboidHLLMap, + averageSamplingPercentage); + + try (FSDataInputStream mergedStats = hdfs + .open(new Path(resultDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME))) { + // put the statistics to metadata store + String statisticsFileName = optimizeSegment.getStatisticsResourcePath(); + rs.putResource(statisticsFileName, mergedStats, System.currentTimeMillis()); + } + + //By default, the cube optimization will use in-memory cubing + CubingJob cubingJob = (CubingJob) getManager() + .getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); + StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, optimizeSegment); + + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } catch (IOException e) { + logger.error("fail to merge cuboid statistics", e); + return new ExecuteResult(e, e.getLocalizedMessage()); + } + + } + + private void addFromCubeStatsReader(CubeStatsReader cubeStatsReader) { + for (Map.Entry<Long, HLLCounter> entry : cubeStatsReader.getCuboidRowEstimatesHLLOrigin().entrySet()) { + if (cuboidHLLMap.get(entry.getKey()) != null) { + cuboidHLLMap.get(entry.getKey()).merge(entry.getValue()); + } else { + cuboidHLLMap.put(entry.getKey(), entry.getValue()); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index 6680fd7..646c74f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -35,6 +35,7 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil; import org.apache.kylin.engine.mr.common.NDCuboidBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,8 +51,8 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private Text outputKey = new Text(); private String cubeName; private String segmentID; - private CubeSegment cubeSegment; private CubeDesc cubeDesc; + private CubeSegment cubeSegment; private CuboidScheduler cuboidScheduler; private int handleCounter; @@ -65,17 +66,18 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { protected void setup(Context context) throws IOException { super.bindCurrentConfiguration(context.getConfiguration()); - cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); + cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME); segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); + String cuboidModeName = context.getConfiguration().get(BatchConstants.CFG_CUBOID_MODE); KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); - cubeSegment = cube.getSegmentById(segmentID); cubeDesc = cube.getDescriptor(); + cubeSegment = cube.getSegmentById(segmentID); ndCuboidBuilder = new NDCuboidBuilder(cubeSegment); // initialize CubiodScheduler - cuboidScheduler = cubeSegment.getCuboidScheduler(); + cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(cubeSegment, cuboidModeName); rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); } @@ -104,7 +106,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { } for (Long child : myChildren) { - Cuboid childCuboid = Cuboid.findById(cuboidScheduler, child); + Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child); Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); outputKey.set(result.getSecond().array(), 0, result.getFirst()); context.write(outputKey, value); http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java deleted file mode 100644 index 5c0555a..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.mr.steps; - -import java.io.IOException; -import java.util.Map; - -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.engine.mr.common.CubeStatsReader; -import org.apache.kylin.job.exception.JobException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ReducerNumSizing { - - private static final Logger logger = LoggerFactory.getLogger(ReducerNumSizing.class); - - public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException { - CubeDesc cubeDesc = cubeSegment.getCubeDesc(); - KylinConfig kylinConfig = cubeDesc.getConfig(); - - double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); - double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); - logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level); - - CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig); - - double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst; - - if (level == -1) { - //merge case - double estimatedSize = cubeStatsReader.estimateCubeSize(); - adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize; - logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize, totalMapInputMB, adjustedCurrentLayerSizeEst); - } else if (level == 0) { - //base cuboid case TODO: the estimation could be very WRONG because it has no correction - adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0); - logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst); - } else { - parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1); - currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level); - adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst; - logger.debug("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst); - } - - // number of reduce tasks - int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99); - - // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance - if (cubeDesc.hasMemoryHungryMeasures()) { - logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures"); - numReduceTasks = numReduceTasks * 4; - } - - // at least 1 reducer by default - numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks); - // no more than 500 reducer by default - numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); - - return numReduceTasks; - } - - public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg) throws IOException { - KylinConfig kylinConfig = cubeSeg.getConfig(); - - Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, kylinConfig).getCuboidSizeMap(); - double totalSizeInM = 0; - for (Double cuboidSize : cubeSizeMap.values()) { - totalSizeInM += cuboidSize; - } - - double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); - - // number of reduce tasks - int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB); - - // at least 1 reducer by default - numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks); - // no more than 500 reducer by default - numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); - - logger.info("Having total map input MB " + Math.round(totalSizeInM)); - logger.info("Having per reduce MB " + perReduceInputMB); - logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks); - return numReduceTasks; - } -}