KYLIN-1703 Fix concurrency issue brought by ToolRunner.run()
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/682a2f10 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/682a2f10 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/682a2f10 Branch: refs/heads/1.5.x-HBase1.x Commit: 682a2f10c570991a528e6996bd065703aba2a3ac Parents: a63f959 Author: lidongsjtu <lid...@apache.org> Authored: Wed May 18 10:22:44 2016 +0800 Committer: lidongsjtu <lid...@apache.org> Committed: Wed May 18 12:01:23 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/engine/mr/MRUtil.java | 25 +++++++++++++++++++- .../engine/mr/common/MapReduceExecutable.java | 8 ++++--- 2 files changed, 29 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/682a2f10/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java index d839c2c..b67dc38 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java @@ -18,6 +18,9 @@ package org.apache.kylin.engine.mr; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide; @@ -38,7 +41,7 @@ public class MRUtil { public static IMRBatchCubingInputSide getBatchCubingInputSide(IRealizationSegment seg) { return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg); } - + public static IMRTableInputFormat getTableInputFormat(String tableName) { return getTableInputFormat(getTableDesc(tableName)); } @@ -71,4 +74,24 @@ public class MRUtil { return StorageFactory.createEngineAdapter(seg, IMROutput.class).getBatchInvertedIndexingOutputSide(seg); } + // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale + // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe + public static int runMRJob(Tool tool, String[] args) throws Exception { + Configuration conf = tool.getConf(); + if (conf == null) { + conf = new Configuration(); + } + + GenericOptionsParser parser = getParser(conf, args); + //set the configuration back, so that Tool can configure itself + tool.setConf(conf); + + //get the args w/o generic hadoop args + String[] toolArgs = parser.getRemainingArgs(); + return tool.run(toolArgs); + } + + private static synchronized GenericOptionsParser getParser(Configuration conf, String[] args) throws Exception { + return new GenericOptionsParser(conf, args); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/682a2f10/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index 54459d7..aefcb95 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -31,13 +31,13 @@ import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.RMHAUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.constant.JobStepStatusEnum; import org.apache.kylin.job.exception.ExecuteException; @@ -115,7 +115,10 @@ public class MapReduceExecutable extends AbstractExecutable { String[] args = params.trim().split("\\s+"); try { //for async mr job, ToolRunner just return 0; - ToolRunner.run(hadoopJob, args); + + // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale + // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe + MRUtil.runMRJob(hadoopJob, args); if (hadoopJob.isSkipped()) { return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped"); @@ -266,5 +269,4 @@ public class MapReduceExecutable extends AbstractExecutable { public void setCounterSaveAs(String value) { setParam(KEY_COUNTER_SAVEAS, value); } - }