minor, set reducer number in job Signed-off-by: Billy Liu <billy...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b736175f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b736175f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b736175f Branch: refs/heads/master-hbase1.x Commit: b736175f417f48fe07d01c0e2c99e040c3242d6f Parents: df5faf3 Author: Roger Shi <roger@Rogers-MacBook-Pro.local> Authored: Thu Jan 5 15:38:51 2017 +0800 Committer: Billy Liu <billy...@apache.org> Committed: Thu Jan 5 15:51:01 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/engine/mr/steps/CuboidJob.java | 2 +- .../apache/kylin/engine/mr/steps/LayerReducerNumSizing.java | 8 ++------ .../org/apache/kylin/engine/mr/steps/MergeCuboidJob.java | 2 +- 3 files changed, 4 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b736175f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index ef25b55..b2e186d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -131,7 +131,7 @@ public class CuboidJob extends AbstractHadoopJob { // add metadata to distributed cache attachSegmentMetadataWithDict(segment, job.getConfiguration()); - LayerReducerNumSizing.setReduceTaskNum(job, segment, getTotalMapInputMB(), nCuboidLevel); + job.setNumReduceTasks(LayerReducerNumSizing.getReduceTaskNum(segment, getTotalMapInputMB(), nCuboidLevel)); this.deletePath(job.getConfiguration(), output); http://git-wip-us.apache.org/repos/asf/kylin/blob/b736175f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java index 713a95c..7ce9842 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java @@ -20,8 +20,6 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Reducer; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeDesc; @@ -34,7 +32,7 @@ public class LayerReducerNumSizing { private static final Logger logger = LoggerFactory.getLogger(LayerReducerNumSizing.class); - public static void setReduceTaskNum(Job job, CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException { + public static int getReduceTaskNum(CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException { CubeDesc cubeDesc = cubeSegment.getCubeDesc(); KylinConfig kylinConfig = cubeDesc.getConfig(); @@ -76,9 +74,7 @@ public class LayerReducerNumSizing { // no more than 500 reducer by default numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); - job.setNumReduceTasks(numReduceTasks); - - logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks); + return numReduceTasks; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/b736175f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java index d9ff616..84b76e3 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java @@ -82,7 +82,7 @@ public class MergeCuboidJob extends CuboidJob { // TODO actually only dictionaries from merging segments are needed attachCubeMetadataWithDict(cube, job.getConfiguration()); - LayerReducerNumSizing.setReduceTaskNum(job, cube.getSegmentById(segmentID), getTotalMapInputMB(), -1); + job.setNumReduceTasks(LayerReducerNumSizing.getReduceTaskNum(cube.getSegmentById(segmentID), getTotalMapInputMB(), -1)); this.deletePath(job.getConfiguration(), output);