OOZIE-2874 Make the Launcher Mapper map-only job's InputFormat class pluggable 
(andras.piros via gezapeti)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/4cb29f81
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/4cb29f81
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/4cb29f81

Branch: refs/heads/oya
Commit: 4cb29f81492d3b4250f43898a1b3ffae2b28e012
Parents: f5554dd
Author: Gezapeti Cseh <gezap...@gmail.com>
Authored: Tue May 9 10:43:54 2017 -0700
Committer: Gezapeti Cseh <gezap...@gmail.com>
Committed: Tue May 9 10:43:54 2017 -0700

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java | 22 +++++++++++++-------
 .../action/hadoop/LauncherMapperHelper.java     |  4 +++-
 core/src/main/resources/oozie-default.xml       |  8 +++++++
 release-log.txt                                 |  1 +
 4 files changed, 26 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/4cb29f81/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index f62c997..d60a5c7 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
-import 
org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -103,7 +102,9 @@ public class JavaActionExecutor extends ActionExecutor {
     public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
     public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = 
"yarn.timeline-service.enabled";
     public static final String HADOOP_YARN_UBER_MODE = 
"mapreduce.job.ubertask.enable";
-    public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART = 
"oozie.action.launcher.am.restart.kill.childjobs";
+    public static final String OOZIE_ACTION_LAUNCHER_PREFIX = 
ActionExecutor.CONF_PREFIX  + "launcher.";
+    public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART =
+            OOZIE_ACTION_LAUNCHER_PREFIX + "am.restart.kill.childjobs";
     public static final String HADOOP_MAP_MEMORY_MB = 
"mapreduce.map.memory.mb";
     public static final String HADOOP_CHILD_JAVA_OPTS = 
"mapred.child.java.opts";
     public static final String HADOOP_MAP_JAVA_OPTS = 
"mapreduce.map.java.opts";
@@ -125,10 +126,11 @@ public class JavaActionExecutor extends ActionExecutor {
     protected XLog LOG = XLog.getLog(getClass());
     private static final Pattern heapPattern = 
Pattern.compile("-Xmx(([0-9]+)[mMgG])");
     private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir=";
-    public static final String CONF_HADOOP_YARN_UBER_MODE = 
"oozie.action.launcher." + HADOOP_YARN_UBER_MODE;
+    public static final String CONF_HADOOP_YARN_UBER_MODE = 
OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_UBER_MODE;
     public static final String HADOOP_JOB_CLASSLOADER = 
"mapreduce.job.classloader";
     public static final String HADOOP_USER_CLASSPATH_FIRST = 
"mapreduce.user.classpath.first";
     public static final String OOZIE_CREDENTIALS_SKIP = 
"oozie.credentials.skip";
+    private static final LauncherInputFormatClassLocator 
launcherInputFormatClassLocator = new LauncherInputFormatClassLocator();
 
     public XConfiguration workflowConf = null;
 
@@ -151,7 +153,7 @@ public class JavaActionExecutor extends ActionExecutor {
     public static List<Class> getCommonLauncherClasses() {
         List<Class> classes = new ArrayList<Class>();
         classes.add(LauncherMapper.class);
-        classes.add(OozieLauncherInputFormat.class);
+        classes.add(launcherInputFormatClassLocator.locateOrGet());
         classes.add(OozieLauncherOutputFormat.class);
         classes.add(OozieLauncherOutputCommitter.class);
         classes.add(LauncherMainHadoopUtils.class);
@@ -297,12 +299,12 @@ public class JavaActionExecutor extends ActionExecutor {
         // 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable
         // 3. oozie.action.launcher.mapreduce.job.ubertask.enable
         if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
-            if (ConfigurationService.get("oozie.action." + getType() + 
".launcher." + HADOOP_YARN_UBER_MODE).length() > 0) {
-                if (ConfigurationService.getBoolean("oozie.action." + 
getType() + ".launcher." + HADOOP_YARN_UBER_MODE)) {
+            if (ConfigurationService.get(getActionTypeLauncherPrefix() + 
HADOOP_YARN_UBER_MODE).length() > 0) {
+                if 
(ConfigurationService.getBoolean(getActionTypeLauncherPrefix() + 
HADOOP_YARN_UBER_MODE)) {
                     launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
                 }
             } else {
-                if (ConfigurationService.getBoolean("oozie.action.launcher." + 
HADOOP_YARN_UBER_MODE)) {
+                if 
(ConfigurationService.getBoolean(OOZIE_ACTION_LAUNCHER_PREFIX + 
HADOOP_YARN_UBER_MODE)) {
                     launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
                 }
             }
@@ -312,7 +314,7 @@ public class JavaActionExecutor extends ActionExecutor {
     void injectLauncherTimelineServiceEnabled(Configuration launcherConf, 
Configuration actionConf) {
         // Getting delegation token for ATS. If tez-site.xml is present in 
distributed cache, turn on timeline service.
         if (actionConf.get("oozie.launcher." + 
HADOOP_YARN_TIMELINE_SERVICE_ENABLED) == null
-                && ConfigurationService.getBoolean("oozie.action.launcher." + 
HADOOP_YARN_TIMELINE_SERVICE_ENABLED)) {
+                && 
ConfigurationService.getBoolean(OOZIE_ACTION_LAUNCHER_PREFIX + 
HADOOP_YARN_TIMELINE_SERVICE_ENABLED)) {
             String cacheFiles = launcherConf.get("mapred.cache.files");
             if (cacheFiles != null && cacheFiles.contains("tez-site.xml")) {
                 launcherConf.setBoolean(HADOOP_YARN_TIMELINE_SERVICE_ENABLED, 
true);
@@ -1778,4 +1780,8 @@ public class JavaActionExecutor extends ActionExecutor {
         return workflowConf;
 
     }
+
+    private String getActionTypeLauncherPrefix() {
+        return "oozie.action." + getType() + ".launcher.";
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4cb29f81/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
index 9609fdc..72ed2f1 100644
--- 
a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
+++ 
b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
@@ -57,6 +57,8 @@ public class LauncherMapperHelper {
 
     public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
 
+    private static final LauncherInputFormatClassLocator 
launcherInputFormatClassLocator = new LauncherInputFormatClassLocator();
+
     public static String getRecoveryId(Configuration launcherConf, Path 
actionDir, String recoveryId)
             throws HadoopAccessorException, IOException {
         String jobId = null;
@@ -160,7 +162,7 @@ public class LauncherMapperHelper {
             IOUtils.closeSafely(os);
         }
 
-        launcherConf.setInputFormat(OozieLauncherInputFormat.class);
+        
launcherConf.setInputFormat(launcherInputFormatClassLocator.locateOrGet());
         launcherConf.setOutputFormat(OozieLauncherOutputFormat.class);
         launcherConf.setOutputCommitter(OozieLauncherOutputCommitter.class);
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4cb29f81/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index e7a48a0..076401d 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -1860,6 +1860,14 @@ will be the requeue interval for the actions which are 
waiting for a long time w
     </property>
 
     <property>
+        <name>oozie.action.launcher.mapreduce.input.format.class</name>
+        <value>org.apache.oozie.action.hadoop.OozieLauncherInputFormat</value>
+        <description>
+            Make the Launcher Mapper map-only job's InputFormat class 
pluggable in order to provide alternative implementations.
+        </description>
+    </property>
+
+    <property>
         <name>oozie.action.spark.setup.hadoop.conf.dir</name>
         <value>false</value>
         <description>

http://git-wip-us.apache.org/repos/asf/oozie/blob/4cb29f81/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index cb141cb..ee65cd1 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.4.0 release (trunk - unreleased)
 
+OOZIE-2874 Make the Launcher Mapper map-only job's InputFormat class pluggable 
(andras.piros via gezapeti)
 OOZIE-2751 LocalOozieClient is missing methods from OozieClient (abhishekbafna 
via rkanter)
 OOZIE-2870 non working examples in oozie documentation coordinator spec 
(andras.piros via pbacsko)
 OOZIE-2827 amend More directly view of the coordinator’s history from 
perspective of workflow action. (Alonzo Zhou via pbacsko)

Reply via email to