KYLIN-2136 Enhance cubing algorithm selection
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a0cfaa1b Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a0cfaa1b Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a0cfaa1b Branch: refs/heads/KYLIN-1971 Commit: a0cfaa1bcba920b98b663f19e4df8142019d5965 Parents: 5922830 Author: Li Yang <liy...@apache.org> Authored: Fri Oct 28 11:34:14 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Fri Oct 28 14:52:05 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 6 +- .../kylin/engine/mr/common/CubeStatsReader.java | 13 +++- .../kylin/engine/mr/common/CubeStatsWriter.java | 76 ++++++++++++++++++++ .../kylin/engine/mr/common/CuboidStatsUtil.java | 73 ------------------- .../mr/steps/FactDistinctColumnsReducer.java | 9 ++- .../engine/mr/steps/MergeStatisticsStep.java | 4 +- .../engine/mr/steps/SaveStatisticsStep.java | 17 +++-- .../steps/FactDistinctColumnsReducerTest.java | 4 +- 8 files changed, 115 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/a0cfaa1b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 8744296..47f9878 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -424,9 +424,13 @@ abstract public class KylinConfigBase implements Serializable { } public double getCubeAlgorithmAutoThreshold() { - return Double.parseDouble(getOptional("kylin.cube.algorithm.auto.threshold", "8")); + return Double.parseDouble(getOptional("kylin.cube.algorithm.auto.threshold", "7")); } + public int getCubeAlgorithmAutoMapperLimit() { + return Integer.parseInt(getOptional("kylin.cube.algorithm.auto.mapper.limit", "500")); + } + @Deprecated public int getCubeAggrGroupMaxSize() { return Integer.parseInt(getOptional("kylin.cube.aggrgroup.max.size", "12")); http://git-wip-us.apache.org/repos/asf/kylin/blob/a0cfaa1b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 83e46e3..2dcb268 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -73,7 +73,8 @@ public class CubeStatsReader { final CubeSegment seg; final int samplingPercentage; - final double mapperOverlapRatioOfFirstBuild; // only makes sense for the first build, is meaningless after merge + final int mapperNumberOfFirstBuild; // becomes meaningless after merge + final double mapperOverlapRatioOfFirstBuild; // becomes meaningless after merge final Map<Long, HyperLogLogPlusCounter> cuboidRowEstimatesHLL; public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException { @@ -90,6 +91,7 @@ public class CubeStatsReader { reader = new SequenceFile.Reader(hadoopConf, seqInput); int percentage = 100; + int mapperNumber = 0; double mapperOverlapRatio = 0; Map<Long, HyperLogLogPlusCounter> counterMap = Maps.newHashMap(); @@ -100,7 +102,9 @@ public class CubeStatsReader { percentage = Bytes.toInt(value.getBytes()); } else if (key.get() == -1) { mapperOverlapRatio = Bytes.toDouble(value.getBytes()); - } else { + } else if (key.get() == -2) { + mapperNumber = Bytes.toInt(value.getBytes()); + } else if (key.get() > 0) { HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(kylinConfig.getCubeStatsHLLPrecision()); ByteArray byteArray = new ByteArray(value.getBytes()); hll.readRegisters(byteArray.asBuffer()); @@ -110,6 +114,7 @@ public class CubeStatsReader { this.seg = cubeSegment; this.samplingPercentage = percentage; + this.mapperNumberOfFirstBuild = mapperNumber; this.mapperOverlapRatioOfFirstBuild = mapperOverlapRatio; this.cuboidRowEstimatesHLL = counterMap; @@ -141,6 +146,10 @@ public class CubeStatsReader { return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL()); } + public int getMapperNumberOfFirstBuild() { + return mapperNumberOfFirstBuild; + } + public double getMapperOverlapRatioOfFirstBuild() { return mapperOverlapRatioOfFirstBuild; } http://git-wip-us.apache.org/repos/asf/kylin/blob/a0cfaa1b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java new file mode 100644 index 0000000..74a2107 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; + +public class CubeStatsWriter { + + public static void writeCuboidStatistics(Configuration conf, Path outputPath, // + Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException { + writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0); + } + + public static void writeCuboidStatistics(Configuration conf, Path outputPath, // + Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) throws IOException { + Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); + + List<Long> allCuboids = new ArrayList<Long>(); + allCuboids.addAll(cuboidHLLMap.keySet()); + Collections.sort(allCuboids); + + ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); + SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)); + try { + // mapper overlap ratio at key -1 + writer.append(new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio))); + + // mapper number at key -2 + writer.append(new LongWritable(-2), new BytesWritable(Bytes.toBytes(mapperNumber))); + + // sampling percentage at key 0 + writer.append(new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage))); + + for (long i : allCuboids) { + valueBuf.clear(); + cuboidHLLMap.get(i).writeRegisters(valueBuf); + valueBuf.flip(); + writer.append(new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit())); + } + } finally { + IOUtils.closeQuietly(writer); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/a0cfaa1b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java deleted file mode 100644 index d5b4b0d..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.mr.common; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.measure.BufferedMeasureCodec; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; - -public class CuboidStatsUtil { - - public static void writeCuboidStatistics(Configuration conf, Path outputPath, // - Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException { - writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0); - } - - public static void writeCuboidStatistics(Configuration conf, Path outputPath, // - Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage, double mapperOverlapRatio) throws IOException { - Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); - - List<Long> allCuboids = new ArrayList<Long>(); - allCuboids.addAll(cuboidHLLMap.keySet()); - Collections.sort(allCuboids); - - ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); - SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)); - try { - // mapper overlap ratio at key -1 - writer.append(new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio))); - - // sampling percentage at key 0 - writer.append(new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage))); - - for (long i : allCuboids) { - valueBuf.clear(); - cuboidHLLMap.get(i).writeRegisters(valueBuf); - valueBuf.flip(); - writer.append(new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit())); - } - } finally { - IOUtils.closeQuietly(writer); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/a0cfaa1b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 2889ba8..c8624bb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -41,7 +41,7 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.KylinReducer; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.common.CuboidStatsUtil; +import org.apache.kylin.engine.mr.common.CubeStatsWriter; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; @@ -189,10 +189,12 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri grandTotal += hll.getCountEstimate(); } double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; + + int mapperNumber = baseCuboidRowCountInMappers.size(); writeMapperAndCuboidStatistics(context); // for human check - CuboidStatsUtil.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // - cuboidHLLMap, samplingPercentage, mapperOverlapRatio); // for CreateHTableJob + CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // + cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); } } @@ -214,6 +216,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri writeLine(out, msg); writeLine(out, "The following statistics are collected based on sampling data."); + writeLine(out, "Number of Mappers: " + baseCuboidRowCountInMappers.size()); for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) { if (baseCuboidRowCountInMappers.get(i) > 0) { msg = "Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i); http://git-wip-us.apache.org/repos/asf/kylin/blob/a0cfaa1b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java index c774cd6..88f6ba2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java @@ -42,7 +42,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.common.CuboidStatsUtil; +import org.apache.kylin.engine.mr.common.CubeStatsWriter; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -121,7 +121,7 @@ public class MergeStatisticsStep extends AbstractExecutable { } } averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size(); - CuboidStatsUtil.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage); + CubeStatsWriter.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage); Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); FileSystem fs = statisticsFilePath.getFileSystem(conf); FSDataInputStream is = fs.open(statisticsFilePath); http://git-wip-us.apache.org/repos/asf/kylin/blob/a0cfaa1b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index 3cace64..8777af7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -104,10 +104,19 @@ public class SaveStatisticsStep extends AbstractExecutable { } else if ("random".equalsIgnoreCase(algPref)) { // for testing alg = new Random().nextBoolean() ? AlgorithmEnum.INMEM : AlgorithmEnum.LAYER; } else { // the default - double threshold = kylinConf.getCubeAlgorithmAutoThreshold(); - double mapperOverlapRatio = new CubeStatsReader(seg, kylinConf).getMapperOverlapRatioOfFirstBuild(); - logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " + threshold); - alg = mapperOverlapRatio < threshold ? AlgorithmEnum.INMEM : AlgorithmEnum.LAYER; + CubeStatsReader cubeStats = new CubeStatsReader(seg, kylinConf); + int mapperNumber = cubeStats.getMapperNumberOfFirstBuild(); + int mapperNumLimit = kylinConf.getCubeAlgorithmAutoMapperLimit(); + double mapperOverlapRatio = cubeStats.getMapperOverlapRatioOfFirstBuild(); + double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold(); + logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit); + logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " + overlapThreshold); + + // in-mem cubing is good when + // 1) the cluster has enough mapper slots to run in parallel + // 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage + alg = (mapperNumber <= mapperNumLimit && mapperOverlapRatio <= overlapThreshold)// + ? AlgorithmEnum.INMEM : AlgorithmEnum.LAYER; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/a0cfaa1b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java index cbbaf38..ca8684f 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.engine.mr.common.CuboidStatsUtil; +import org.apache.kylin.engine.mr.common.CubeStatsWriter; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.junit.Test; @@ -49,7 +49,7 @@ public class FactDistinctColumnsReducerTest { System.out.println(outputPath); Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap(); - CuboidStatsUtil.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100); + CubeStatsWriter.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100); FileSystem.getLocal(conf).delete(outputPath, true); }