KYLIN-1656 Improve performance of MRv2 engine by making each mapper handles a configured number of records
Signed-off-by: shaofengshi <shaofeng...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c6aad4e9 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c6aad4e9 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c6aad4e9 Branch: refs/heads/master Commit: c6aad4e9636dbc52b3459268b45b433dc5f628ec Parents: 6c32fd6 Author: gaodayue <gaoda...@meituan.com> Authored: Thu May 5 12:59:19 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed May 18 10:38:32 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 + .../kylin/common/util/BufferedLogger.java | 47 ++++++ .../org/apache/kylin/job/JoinedFlatTable.java | 67 ++++---- .../kylin/job/constant/ExecutableConstants.java | 1 + .../apache/kylin/source/hive/HiveMRInput.java | 153 ++++++++++++++++++- 5 files changed, 243 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/c6aad4e9/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 1062749..14dda82 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -293,6 +293,10 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.job.mapreduce.max.reducer.number", "500")); } + public int getHadoopJobMapperInputRows() { + return Integer.parseInt(getOptional("kylin.job.mapreduce.mapper.input.rows", "500000")); + } + public boolean getRunAsRemoteCommand() { return Boolean.parseBoolean(getOptional("kylin.job.run.as.remote.cmd")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/c6aad4e9/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java b/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java new file mode 100644 index 0000000..cef598d --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common.util; + +/** + * A Logger that remembers all the logged message. + * + * <b>This class is not thread-safe.</b> + */ +public class BufferedLogger implements Logger { + private final org.slf4j.Logger wrappedLogger; + private final StringBuilder buffer = new StringBuilder(); + + public BufferedLogger(org.slf4j.Logger wrappedLogger) { + this.wrappedLogger = wrappedLogger; + } + + @Override + public void log(String message) { + wrappedLogger.info(message); + buffer.append(message).append("\n"); + } + + public String getBufferedLog() { + return buffer.toString(); + } + + public void resetBuffer() { + buffer.setLength(0); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c6aad4e9/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 5886325..6ae8110 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -55,6 +55,36 @@ public class JoinedFlatTable { return storageDfsDir + "/" + intermediateTableDesc.getTableName(); } + public static String generateHiveSetStatements(JobEngineConfig engineConfig) throws IOException { + StringBuilder buffer = new StringBuilder(); + File hadoopPropertiesFile = new File(engineConfig.getHiveConfFilePath()); + + if (hadoopPropertiesFile.exists()) { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + DocumentBuilder builder; + Document doc; + try { + builder = factory.newDocumentBuilder(); + doc = builder.parse(hadoopPropertiesFile); + NodeList nl = doc.getElementsByTagName("property"); + for (int i = 0; i < nl.getLength(); i++) { + String name = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue(); + String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue(); + if (!name.equals("tmpjars")) { + buffer.append("SET " + name + "=" + value + ";\n"); + } + } + + } catch (ParserConfigurationException e) { + throw new IOException(e); + } catch (SAXException e) { + throw new IOException(e); + } + } + + return buffer.toString(); + } + public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) { StringBuilder ddl = new StringBuilder(); @@ -86,37 +116,16 @@ public class JoinedFlatTable { public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig) throws IOException { StringBuilder sql = new StringBuilder(); - - File hadoopPropertiesFile = new File(engineConfig.getHiveConfFilePath()); - - if (hadoopPropertiesFile.exists()) { - DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); - DocumentBuilder builder; - Document doc; - try { - builder = factory.newDocumentBuilder(); - doc = builder.parse(hadoopPropertiesFile); - NodeList nl = doc.getElementsByTagName("property"); - for (int i = 0; i < nl.getLength(); i++) { - String name = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue(); - String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue(); - if (name.equals("tmpjars") == false) { - sql.append("SET " + name + "=" + value + ";").append("\n"); - } - } - - } catch (ParserConfigurationException e) { - throw new IOException(e); - } catch (SAXException e) { - throw new IOException(e); - } - } - + sql.append(generateHiveSetStatements(engineConfig)); sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n"); - return sql.toString(); } + public static String generateRedistributeDataStatement(IJoinedFlatTableDesc intermediateTableDesc) { + final String tableName = intermediateTableDesc.getTableName(); + return "INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + tableName + " distribute by rand();\n"; + } + public static String generateSelectDataStatement(IJoinedFlatTableDesc intermediateTableDesc) { StringBuilder sql = new StringBuilder(); sql.append("SELECT" + "\n"); @@ -135,6 +144,10 @@ public class JoinedFlatTable { return sql.toString(); } + public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir) { + return "INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n"; + } + private static Map<String, String> buildTableAliasMap(DataModelDesc dataModelDesc) { Map<String, String> tableAliasMap = new HashMap<String, String>(); http://git-wip-us.apache.org/repos/asf/kylin/blob/c6aad4e9/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index d47d550..50e8238 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -37,6 +37,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables"; + public static final String STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE = "Redistribute Intermediate Flat Hive Table"; public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data"; public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube"; http://git-wip-us.apache.org/repos/asf/kylin/blob/c6aad4e9/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 9997b09..248a57b 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -19,9 +19,11 @@ package org.apache.kylin.source.hive; import java.io.IOException; +import java.io.InputStream; import java.util.Set; import com.google.common.collect.Sets; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -29,12 +31,17 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.BufferedLogger; +import org.apache.kylin.common.util.CliCommandExecutor; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.DimensionDesc; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.ExecutableConstants; @@ -46,7 +53,6 @@ import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.IRealizationSegment; public class HiveMRInput implements IMRInput { @@ -109,11 +115,14 @@ public class HiveMRInput implements IMRInput { @Override public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { + final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); + jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId())); AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId()); if(task != null) { jobFlow.addTask(task); } + jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId(), cubeName)); } public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId) { @@ -142,7 +151,6 @@ public class HiveMRInput implements IMRInput { return step; } - public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) { ShellExecutable step = new ShellExecutable();; step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); @@ -185,6 +193,27 @@ public class HiveMRInput implements IMRInput { return step; } + public static AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) { + StringBuilder hiveInitBuf = new StringBuilder(); + hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n"); + try { + hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf)); + } catch (IOException e) { + throw new RuntimeException("Failed to generate hive set statements for RedistributeFlatHiveTableStep", e); + } + + String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count"; + + RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep(); + step.setInitStatement(hiveInitBuf.toString()); + step.setSelectRowCountStatement(JoinedFlatTable.generateSelectRowCountStatement(flatTableDesc, rowCountOutputDir)); + step.setRowCountOutputDir(rowCountOutputDir); + step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeDataStatement(flatTableDesc)); + CubingExecutableUtil.setCubeName(cubeName, step.getParams()); + step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE); + return step; + } + @Override public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { GarbageCollectionStep step = new GarbageCollectionStep(); @@ -205,6 +234,126 @@ public class HiveMRInput implements IMRInput { } } + public static class RedistributeFlatHiveTableStep extends AbstractExecutable { + private final BufferedLogger stepLogger = new BufferedLogger(logger); + + private void computeRowCount(CliCommandExecutor cmdExecutor) throws IOException { + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.addStatement(getInitStatement()); + hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n"); + hiveCmdBuilder.addStatement(getSelectRowCountStatement()); + final String cmd = hiveCmdBuilder.build(); + + stepLogger.log("Compute row count of flat hive table, cmd: "); + stepLogger.log(cmd); + + Pair<Integer, String> response = cmdExecutor.execute(cmd, stepLogger); + if (response.getFirst() != 0) { + throw new RuntimeException("Failed to compute row count of flat hive table"); + } + } + + private long readRowCountFromFile(Path file) throws IOException { + FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration()); + InputStream in = fs.open(file); + try { + String content = IOUtils.toString(in); + return Long.valueOf(content.trim()); // strip the '\n' character + + } finally { + IOUtils.closeQuietly(in); + } + } + + private int determineNumReducer(KylinConfig config) throws IOException { + computeRowCount(config.getCliCommandExecutor()); + + Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0"); + long rowCount = readRowCountFromFile(rowCountFile); + int mapperInputRows = config.getHadoopJobMapperInputRows(); + + int numReducers = Math.round(rowCount / ((float) mapperInputRows)); + numReducers = Math.max(1, numReducers); + + stepLogger.log("total input rows = " + rowCount); + stepLogger.log("expected input rows per mapper = " + mapperInputRows); + stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers); + + return numReducers; + } + + private void redistributeTable(KylinConfig config, int numReducers) throws IOException { + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.addStatement(getInitStatement()); + hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n"); + hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n"); + hiveCmdBuilder.addStatement(getRedistributeDataStatement()); + final String cmd = hiveCmdBuilder.toString(); + + stepLogger.log("Redistribute table, cmd: "); + stepLogger.log(cmd); + + Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger); + if (response.getFirst() != 0) { + throw new RuntimeException("Failed to redistribute flat hive table"); + } + } + + private KylinConfig getCubeSpecificConfig() { + String cubeName = CubingExecutableUtil.getCubeName(getParams()); + CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = manager.getCube(cubeName); + return cube.getConfig(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + KylinConfig config = getCubeSpecificConfig(); + + try { + int numReducers = determineNumReducer(config); + redistributeTable(config, numReducers); + return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); + + } catch (Exception e) { + logger.error("job:" + getId() + " execute finished with exception", e); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + } + } + + public void setInitStatement(String sql) { + setParam("HiveInit", sql); + } + + public String getInitStatement() { + return getParam("HiveInit"); + } + + public void setSelectRowCountStatement(String sql) { + setParam("HiveSelectRowCount", sql); + } + + public String getSelectRowCountStatement() { + return getParam("HiveSelectRowCount"); + } + + public void setRedistributeDataStatement(String sql) { + setParam("HiveRedistributeData", sql); + } + + public String getRedistributeDataStatement() { + return getParam("HiveRedistributeData"); + } + + public void setRowCountOutputDir(String rowCountOutputDir) { + setParam("rowCountOutputDir", rowCountOutputDir); + } + + public String getRowCountOutputDir() { + return getParam("rowCountOutputDir"); + } + } + public static class GarbageCollectionStep extends AbstractExecutable { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {