Repository: kylin Updated Branches: refs/heads/KYLIN-2246 [created] 815887e73
KYLIN-2246 redesign the way to decide layer cubing reducer count Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/815887e7 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/815887e7 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/815887e7 Branch: refs/heads/KYLIN-2246 Commit: 815887e73a5c3b0852b6cf5650400235797d7ce9 Parents: 59a30f6 Author: Hongbin Ma <mahong...@apache.org> Authored: Mon Dec 5 21:02:36 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Mon Dec 5 21:02:44 2016 +0800 ---------------------------------------------------------------------- .../kylin/cube/cuboid/CuboidScheduler.java | 31 +++++++- .../kylin/engine/mr/common/CubeStatsReader.java | 26 ++++++- .../apache/kylin/engine/mr/steps/CuboidJob.java | 52 +------------ .../engine/mr/steps/LayerReduerNumSizing.java | 82 ++++++++++++++++++++ .../kylin/engine/mr/steps/MergeCuboidJob.java | 2 +- 5 files changed, 138 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java index bd6a37a..733aded 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java @@ -18,7 +18,7 @@ package org.apache.kylin.cube.cuboid; -/** +/** */ import java.util.Collections; @@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.kylin.cube.model.AggregationGroup; import org.apache.kylin.cube.model.CubeDesc; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -39,6 +40,7 @@ public class CuboidScheduler { private final CubeDesc cubeDesc; private final long max; private final Map<Long, List<Long>> cache; + private List<List<Long>> cuboidsByLayer; public CuboidScheduler(CubeDesc cubeDesc) { this.cubeDesc = cubeDesc; @@ -232,4 +234,31 @@ public class CuboidScheduler { getSubCuboidIds(cuboidId, result); } } + + public List<List<Long>> getCuboidsByLayer() { + if (cuboidsByLayer != null) { + return cuboidsByLayer; + } + + int totalNum = 0; + int layerNum = cubeDesc.getBuildLevel(); + cuboidsByLayer = Lists.newArrayList(); + + cuboidsByLayer.add(Collections.singletonList(Cuboid.getBaseCuboidId(cubeDesc))); + totalNum++; + + for (int i = 1; i <= layerNum; i++) { + List<Long> lastLayer = cuboidsByLayer.get(i - 1); + List<Long> newLayer = Lists.newArrayList(); + for (Long parent : lastLayer) { + newLayer.addAll(getSpanningCuboid(parent)); + } + cuboidsByLayer.add(newLayer); + totalNum += newLayer.size(); + } + + int size = getAllCuboidIds().size(); + Preconditions.checkState(totalNum == size, "total Num: " + totalNum + " actual size: " + size); + return cuboidsByLayer; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index c917cfb..1cf5da6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; @@ -75,9 +76,11 @@ public class CubeStatsReader { final int mapperNumberOfFirstBuild; // becomes meaningless after merge final double mapperOverlapRatioOfFirstBuild; // becomes meaningless after merge final Map<Long, HyperLogLogPlusCounter> cuboidRowEstimatesHLL; + final CuboidScheduler cuboidScheduler; public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException { ResourceStore store = ResourceStore.getStore(kylinConfig); + cuboidScheduler = new CuboidScheduler(cubeSegment.getCubeDesc()); String statsKey = cubeSegment.getStatisticsResourcePath(); File tmpSeqFile = writeTmpSeqFile(store.getResource(statsKey).inputStream); Reader reader = null; @@ -145,6 +148,10 @@ public class CubeStatsReader { return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL()); } + public double estimateCubeSize() { + return SumHelper.sumDouble(getCuboidSizeMap().values()); + } + public int getMapperNumberOfFirstBuild() { return mapperNumberOfFirstBuild; } @@ -248,12 +255,23 @@ public class CubeStatsReader { out.println("----------------------------------------------------------------------------"); } + //return MB + public double estimateLayerSize(int level) { + List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer(); + Map<Long, Double> cuboidSizeMap = getCuboidSizeMap(); + double ret = 0; + for (Long cuboidId : layeredCuboids.get(level)) { + ret += cuboidSizeMap.get(cuboidId); + } + + logger.info("Estimating size for layer {}, all cuboids are {}, total size is {}", level, StringUtils.join(layeredCuboids.get(level), ","), ret); + return ret; + } + private void printCuboidInfoTreeEntry(Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, PrintWriter out) { - CubeDesc cubeDesc = seg.getCubeDesc(); - CuboidScheduler scheduler = new CuboidScheduler(cubeDesc); - long baseCuboid = Cuboid.getBaseCuboidId(cubeDesc); + long baseCuboid = Cuboid.getBaseCuboidId(seg.getCubeDesc()); int dimensionCount = Long.bitCount(baseCuboid); - printCuboidInfoTree(-1L, baseCuboid, scheduler, cuboidRows, cuboidSizes, dimensionCount, 0, out); + printCuboidInfoTree(-1L, baseCuboid, cuboidScheduler, cuboidRows, cuboidSizes, dimensionCount, 0, out); } private void printKVInfo(PrintWriter writer) { http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index ddd21b7..d3cb494 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; @@ -35,14 +34,11 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.CuboidCLI; -import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.job.exception.JobException; import org.apache.kylin.job.execution.ExecutableManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,6 +96,7 @@ public class CuboidJob extends AbstractHadoopJob { CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); + CubeSegment segment = cube.getSegmentById(segmentID); if (checkSkip(cubingJobId)) { logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segmentID + "[" + segmentID + "]"); @@ -113,7 +110,7 @@ public class CuboidJob extends AbstractHadoopJob { setJobClasspath(job, cube.getConfig()); // Mapper - configureMapperInputFormat(cube.getSegmentById(segmentID)); + configureMapperInputFormat(segment); job.setMapperClass(this.mapperClass); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); @@ -134,7 +131,7 @@ public class CuboidJob extends AbstractHadoopJob { // add metadata to distributed cache attachKylinPropsAndMetadata(cube, job.getConfiguration()); - setReduceTaskNum(job, cube.getDescriptor(), nCuboidLevel); + LayerReduerNumSizing.setReduceTaskNum(job, segment, getTotalMapInputMB(), nCuboidLevel); this.deletePath(job.getConfiguration(), output); @@ -163,49 +160,6 @@ public class CuboidJob extends AbstractHadoopJob { } } - protected void setReduceTaskNum(Job job, CubeDesc cubeDesc, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException { - KylinConfig kylinConfig = cubeDesc.getConfig(); - - double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); - double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); - - // total map input MB - double totalMapInputMB = this.getTotalMapInputMB(); - - // output / input ratio - int preLevelCuboids, thisLevelCuboids; - if (level == 0) { // base cuboid - preLevelCuboids = thisLevelCuboids = 1; - } else { // n-cuboid - int[] allLevelCount = CuboidCLI.calculateAllLevelCount(cubeDesc); - preLevelCuboids = allLevelCount[level - 1]; - thisLevelCuboids = allLevelCount[level]; - } - - // total reduce input MB - double totalReduceInputMB = totalMapInputMB * thisLevelCuboids / preLevelCuboids; - - // number of reduce tasks - int numReduceTasks = (int) Math.round(totalReduceInputMB / perReduceInputMB * reduceCountRatio); - - // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance - if (cubeDesc.hasMemoryHungryMeasures()) { - numReduceTasks = numReduceTasks * 4; - } - - // at least 1 reducer by default - numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks); - // no more than 500 reducer by default - numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); - - job.setNumReduceTasks(numReduceTasks); - - logger.info("Having total map input MB " + Math.round(totalMapInputMB)); - logger.info("Having level " + level + ", pre-level cuboids " + preLevelCuboids + ", this level cuboids " + thisLevelCuboids); - logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio); - logger.info("Setting " + Context.NUM_REDUCES + "=" + numReduceTasks); - } - /** * @param mapperClass * the mapperClass to set http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java new file mode 100644 index 0000000..6bddcbd --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.common.CubeStatsReader; +import org.apache.kylin.job.exception.JobException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LayerReduerNumSizing { + + private static final Logger logger = LoggerFactory.getLogger(LayerReduerNumSizing.class); + + public static void setReduceTaskNum(Job job, CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException { + CubeDesc cubeDesc = cubeSegment.getCubeDesc(); + KylinConfig kylinConfig = cubeDesc.getConfig(); + + double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); + double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); + logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level); + + CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig); + + double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst; + + if (level == -1) { + //merge case + adjustedCurrentLayerSizeEst = cubeStatsReader.estimateCubeSize(); + logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst); + } else if (level == 0) { + //base cuboid case + adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0); + logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst); + } else { + parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1); + currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level); + adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst; + logger.info("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst); + } + + // number of reduce tasks + int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio); + + // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance + if (cubeDesc.hasMemoryHungryMeasures()) { + numReduceTasks = numReduceTasks * 4; + } + + // at least 1 reducer by default + numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks); + // no more than 500 reducer by default + numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); + + job.setNumReduceTasks(numReduceTasks); + + logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java index 810da23..e805d25 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java @@ -81,7 +81,7 @@ public class MergeCuboidJob extends CuboidJob { // add metadata to distributed cache attachKylinPropsAndMetadata(cube, job.getConfiguration()); - setReduceTaskNum(job, cube.getDescriptor(), 0); + LayerReduerNumSizing.setReduceTaskNum(job, cube.getSegmentById(segmentID), getTotalMapInputMB(), -1); this.deletePath(job.getConfiguration(), output);