Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2246 [created] 815887e73


KYLIN-2246 redesign the way to decide layer cubing reducer count


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/815887e7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/815887e7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/815887e7

Branch: refs/heads/KYLIN-2246
Commit: 815887e73a5c3b0852b6cf5650400235797d7ce9
Parents: 59a30f6
Author: Hongbin Ma <mahong...@apache.org>
Authored: Mon Dec 5 21:02:36 2016 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Mon Dec 5 21:02:44 2016 +0800

----------------------------------------------------------------------
 .../kylin/cube/cuboid/CuboidScheduler.java      | 31 +++++++-
 .../kylin/engine/mr/common/CubeStatsReader.java | 26 ++++++-
 .../apache/kylin/engine/mr/steps/CuboidJob.java | 52 +------------
 .../engine/mr/steps/LayerReduerNumSizing.java   | 82 ++++++++++++++++++++
 .../kylin/engine/mr/steps/MergeCuboidJob.java   |  2 +-
 5 files changed, 138 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
index bd6a37a..733aded 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -18,7 +18,7 @@
 
 package org.apache.kylin.cube.cuboid;
 
-/** 
+/**
  */
 
 import java.util.Collections;
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.kylin.cube.model.AggregationGroup;
 import org.apache.kylin.cube.model.CubeDesc;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -39,6 +40,7 @@ public class CuboidScheduler {
     private final CubeDesc cubeDesc;
     private final long max;
     private final Map<Long, List<Long>> cache;
+    private List<List<Long>> cuboidsByLayer;
 
     public CuboidScheduler(CubeDesc cubeDesc) {
         this.cubeDesc = cubeDesc;
@@ -232,4 +234,31 @@ public class CuboidScheduler {
             getSubCuboidIds(cuboidId, result);
         }
     }
+
+    public List<List<Long>> getCuboidsByLayer() {
+        if (cuboidsByLayer != null) {
+            return cuboidsByLayer;
+        }
+
+        int totalNum = 0;
+        int layerNum = cubeDesc.getBuildLevel();
+        cuboidsByLayer = Lists.newArrayList();
+
+        
cuboidsByLayer.add(Collections.singletonList(Cuboid.getBaseCuboidId(cubeDesc)));
+        totalNum++;
+
+        for (int i = 1; i <= layerNum; i++) {
+            List<Long> lastLayer = cuboidsByLayer.get(i - 1);
+            List<Long> newLayer = Lists.newArrayList();
+            for (Long parent : lastLayer) {
+                newLayer.addAll(getSpanningCuboid(parent));
+            }
+            cuboidsByLayer.add(newLayer);
+            totalNum += newLayer.size();
+        }
+
+        int size = getAllCuboidIds().size();
+        Preconditions.checkState(totalNum == size, "total Num: " + totalNum + 
" actual size: " + size);
+        return cuboidsByLayer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index c917cfb..1cf5da6 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -75,9 +76,11 @@ public class CubeStatsReader {
     final int mapperNumberOfFirstBuild; // becomes meaningless after merge
     final double mapperOverlapRatioOfFirstBuild; // becomes meaningless after 
merge
     final Map<Long, HyperLogLogPlusCounter> cuboidRowEstimatesHLL;
+    final CuboidScheduler cuboidScheduler;
 
     public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) 
throws IOException {
         ResourceStore store = ResourceStore.getStore(kylinConfig);
+        cuboidScheduler = new CuboidScheduler(cubeSegment.getCubeDesc());
         String statsKey = cubeSegment.getStatisticsResourcePath();
         File tmpSeqFile = 
writeTmpSeqFile(store.getResource(statsKey).inputStream);
         Reader reader = null;
@@ -145,6 +148,10 @@ public class CubeStatsReader {
         return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL());
     }
 
+    public double estimateCubeSize() {
+        return SumHelper.sumDouble(getCuboidSizeMap().values());
+    }
+
     public int getMapperNumberOfFirstBuild() {
         return mapperNumberOfFirstBuild;
     }
@@ -248,12 +255,23 @@ public class CubeStatsReader {
         
out.println("----------------------------------------------------------------------------");
     }
 
+    //return MB
+    public double estimateLayerSize(int level) {
+        List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer();
+        Map<Long, Double> cuboidSizeMap = getCuboidSizeMap();
+        double ret = 0;
+        for (Long cuboidId : layeredCuboids.get(level)) {
+            ret += cuboidSizeMap.get(cuboidId);
+        }
+
+        logger.info("Estimating size for layer {}, all cuboids are {}, total 
size is {}", level, StringUtils.join(layeredCuboids.get(level), ","), ret);
+        return ret;
+    }
+
     private void printCuboidInfoTreeEntry(Map<Long, Long> cuboidRows, 
Map<Long, Double> cuboidSizes, PrintWriter out) {
-        CubeDesc cubeDesc = seg.getCubeDesc();
-        CuboidScheduler scheduler = new CuboidScheduler(cubeDesc);
-        long baseCuboid = Cuboid.getBaseCuboidId(cubeDesc);
+        long baseCuboid = Cuboid.getBaseCuboidId(seg.getCubeDesc());
         int dimensionCount = Long.bitCount(baseCuboid);
-        printCuboidInfoTree(-1L, baseCuboid, scheduler, cuboidRows, 
cuboidSizes, dimensionCount, 0, out);
+        printCuboidInfoTree(-1L, baseCuboid, cuboidScheduler, cuboidRows, 
cuboidSizes, dimensionCount, 0, out);
     }
 
     private void printKVInfo(PrintWriter writer) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index ddd21b7..d3cb494 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer.Context;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@@ -35,14 +34,11 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.CuboidCLI;
-import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -100,6 +96,7 @@ public class CuboidJob extends AbstractHadoopJob {
 
             CubeManager cubeMgr = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeSegment segment = cube.getSegmentById(segmentID);
 
             if (checkSkip(cubingJobId)) {
                 logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " 
for " + segmentID + "[" + segmentID + "]");
@@ -113,7 +110,7 @@ public class CuboidJob extends AbstractHadoopJob {
             setJobClasspath(job, cube.getConfig());
 
             // Mapper
-            configureMapperInputFormat(cube.getSegmentById(segmentID));
+            configureMapperInputFormat(segment);
             job.setMapperClass(this.mapperClass);
             job.setMapOutputKeyClass(Text.class);
             job.setMapOutputValueClass(Text.class);
@@ -134,7 +131,7 @@ public class CuboidJob extends AbstractHadoopJob {
             // add metadata to distributed cache
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 
-            setReduceTaskNum(job, cube.getDescriptor(), nCuboidLevel);
+            LayerReduerNumSizing.setReduceTaskNum(job, segment, 
getTotalMapInputMB(), nCuboidLevel);
 
             this.deletePath(job.getConfiguration(), output);
 
@@ -163,49 +160,6 @@ public class CuboidJob extends AbstractHadoopJob {
         }
     }
 
-    protected void setReduceTaskNum(Job job, CubeDesc cubeDesc, int level) 
throws ClassNotFoundException, IOException, InterruptedException, JobException {
-        KylinConfig kylinConfig = cubeDesc.getConfig();
-
-        double perReduceInputMB = 
kylinConfig.getDefaultHadoopJobReducerInputMB();
-        double reduceCountRatio = 
kylinConfig.getDefaultHadoopJobReducerCountRatio();
-
-        // total map input MB
-        double totalMapInputMB = this.getTotalMapInputMB();
-
-        // output / input ratio
-        int preLevelCuboids, thisLevelCuboids;
-        if (level == 0) { // base cuboid
-            preLevelCuboids = thisLevelCuboids = 1;
-        } else { // n-cuboid
-            int[] allLevelCount = CuboidCLI.calculateAllLevelCount(cubeDesc);
-            preLevelCuboids = allLevelCount[level - 1];
-            thisLevelCuboids = allLevelCount[level];
-        }
-
-        // total reduce input MB
-        double totalReduceInputMB = totalMapInputMB * thisLevelCuboids / 
preLevelCuboids;
-
-        // number of reduce tasks
-        int numReduceTasks = (int) Math.round(totalReduceInputMB / 
perReduceInputMB * reduceCountRatio);
-
-        // adjust reducer number for cube which has DISTINCT_COUNT measures 
for better performance
-        if (cubeDesc.hasMemoryHungryMeasures()) {
-            numReduceTasks = numReduceTasks * 4;
-        }
-
-        // at least 1 reducer by default
-        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), 
numReduceTasks);
-        // no more than 500 reducer by default
-        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), 
numReduceTasks);
-
-        job.setNumReduceTasks(numReduceTasks);
-
-        logger.info("Having total map input MB " + 
Math.round(totalMapInputMB));
-        logger.info("Having level " + level + ", pre-level cuboids " + 
preLevelCuboids + ", this level cuboids " + thisLevelCuboids);
-        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce 
count ratio " + reduceCountRatio);
-        logger.info("Setting " + Context.NUM_REDUCES + "=" + numReduceTasks);
-    }
-
     /**
      * @param mapperClass
      *            the mapperClass to set

http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java
new file mode 100644
index 0000000..6bddcbd
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.job.exception.JobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LayerReduerNumSizing {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(LayerReduerNumSizing.class);
+
+    public static void setReduceTaskNum(Job job, CubeSegment cubeSegment, 
double totalMapInputMB, int level) throws ClassNotFoundException, IOException, 
InterruptedException, JobException {
+        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+        KylinConfig kylinConfig = cubeDesc.getConfig();
+
+        double perReduceInputMB = 
kylinConfig.getDefaultHadoopJobReducerInputMB();
+        double reduceCountRatio = 
kylinConfig.getDefaultHadoopJobReducerCountRatio();
+        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce 
count ratio " + reduceCountRatio + ", level " + level);
+
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, 
kylinConfig);
+
+        double parentLayerSizeEst, currentLayerSizeEst, 
adjustedCurrentLayerSizeEst;
+
+        if (level == -1) {
+            //merge case
+            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateCubeSize();
+            logger.info("adjustedCurrentLayerSizeEst: {}", 
adjustedCurrentLayerSizeEst);
+        } else if (level == 0) {
+            //base cuboid case
+            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
+            logger.info("adjustedCurrentLayerSizeEst: {}", 
adjustedCurrentLayerSizeEst);
+        } else {
+            parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
+            currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
+            adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst 
* currentLayerSizeEst;
+            logger.info("totalMapInputMB: {}, parentLayerSizeEst: {}, 
currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, 
parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
+        }
+
+        // number of reduce tasks
+        int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / 
perReduceInputMB * reduceCountRatio);
+
+        // adjust reducer number for cube which has DISTINCT_COUNT measures 
for better performance
+        if (cubeDesc.hasMemoryHungryMeasures()) {
+            numReduceTasks = numReduceTasks * 4;
+        }
+
+        // at least 1 reducer by default
+        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), 
numReduceTasks);
+        // no more than 500 reducer by default
+        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), 
numReduceTasks);
+
+        job.setNumReduceTasks(numReduceTasks);
+
+        logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + 
numReduceTasks);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
index 810da23..e805d25 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -81,7 +81,7 @@ public class MergeCuboidJob extends CuboidJob {
             // add metadata to distributed cache
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 
-            setReduceTaskNum(job, cube.getDescriptor(), 0);
+            LayerReduerNumSizing.setReduceTaskNum(job, 
cube.getSegmentById(segmentID), getTotalMapInputMB(), -1);
 
             this.deletePath(job.getConfiguration(), output);
 

Reply via email to