KYLIN-1703 Fix concurrency issue brought by ToolRunner.run()

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/682a2f10
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/682a2f10
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/682a2f10

Branch: refs/heads/1.5.x-HBase1.x
Commit: 682a2f10c570991a528e6996bd065703aba2a3ac
Parents: a63f959
Author: lidongsjtu <lid...@apache.org>
Authored: Wed May 18 10:22:44 2016 +0800
Committer: lidongsjtu <lid...@apache.org>
Committed: Wed May 18 12:01:23 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/engine/mr/MRUtil.java | 25 +++++++++++++++++++-
 .../engine/mr/common/MapReduceExecutable.java   |  8 ++++---
 2 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/682a2f10/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index d839c2c..b67dc38 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -18,6 +18,9 @@
 
 package org.apache.kylin.engine.mr;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
@@ -38,7 +41,7 @@ public class MRUtil {
     public static IMRBatchCubingInputSide 
getBatchCubingInputSide(IRealizationSegment seg) {
         return SourceFactory.createEngineAdapter(seg, 
IMRInput.class).getBatchCubingInputSide(seg);
     }
-    
+
     public static IMRTableInputFormat getTableInputFormat(String tableName) {
         return getTableInputFormat(getTableDesc(tableName));
     }
@@ -71,4 +74,24 @@ public class MRUtil {
         return StorageFactory.createEngineAdapter(seg, 
IMROutput.class).getBatchInvertedIndexingOutputSide(seg);
     }
 
+    // use this method instead of ToolRunner.run() because ToolRunner.run() is 
not thread-sale
+    // Refer to: 
http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
+    public static int runMRJob(Tool tool, String[] args) throws Exception {
+        Configuration conf = tool.getConf();
+        if (conf == null) {
+            conf = new Configuration();
+        }
+
+        GenericOptionsParser parser = getParser(conf, args);
+        //set the configuration back, so that Tool can configure itself
+        tool.setConf(conf);
+
+        //get the args w/o generic hadoop args
+        String[] toolArgs = parser.getRemainingArgs();
+        return tool.run(toolArgs);
+    }
+
+    private static synchronized GenericOptionsParser getParser(Configuration 
conf, String[] args) throws Exception {
+        return new GenericOptionsParser(conf, args);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/682a2f10/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 54459d7..aefcb95 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -31,13 +31,13 @@ import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.RMHAUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -115,7 +115,10 @@ public class MapReduceExecutable extends 
AbstractExecutable {
                 String[] args = params.trim().split("\\s+");
                 try {
                     //for async mr job, ToolRunner just return 0;
-                    ToolRunner.run(hadoopJob, args);
+
+                    // use this method instead of ToolRunner.run() because 
ToolRunner.run() is not thread-sale
+                    // Refer to: 
http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
+                    MRUtil.runMRJob(hadoopJob, args);
 
                     if (hadoopJob.isSkipped()) {
                         return new ExecuteResult(ExecuteResult.State.SUCCEED, 
"skipped");
@@ -266,5 +269,4 @@ public class MapReduceExecutable extends AbstractExecutable 
{
     public void setCounterSaveAs(String value) {
         setParam(KEY_COUNTER_SAVEAS, value);
     }
-
 }

Reply via email to