OOZIE-2874 Make the Launcher Mapper map-only job's InputFormat class pluggable (andras.piros via gezapeti)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/4cb29f81 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/4cb29f81 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/4cb29f81 Branch: refs/heads/oya Commit: 4cb29f81492d3b4250f43898a1b3ffae2b28e012 Parents: f5554dd Author: Gezapeti Cseh <gezap...@gmail.com> Authored: Tue May 9 10:43:54 2017 -0700 Committer: Gezapeti Cseh <gezap...@gmail.com> Committed: Tue May 9 10:43:54 2017 -0700 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 22 +++++++++++++------- .../action/hadoop/LauncherMapperHelper.java | 4 +++- core/src/main/resources/oozie-default.xml | 8 +++++++ release-log.txt | 1 + 4 files changed, 26 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/4cb29f81/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index f62c997..d60a5c7 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -54,7 +54,6 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -103,7 +102,9 @@ public class JavaActionExecutor extends ActionExecutor { public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled"; public static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable"; - public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART = "oozie.action.launcher.am.restart.kill.childjobs"; + public static final String OOZIE_ACTION_LAUNCHER_PREFIX = ActionExecutor.CONF_PREFIX + "launcher."; + public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART = + OOZIE_ACTION_LAUNCHER_PREFIX + "am.restart.kill.childjobs"; public static final String HADOOP_MAP_MEMORY_MB = "mapreduce.map.memory.mb"; public static final String HADOOP_CHILD_JAVA_OPTS = "mapred.child.java.opts"; public static final String HADOOP_MAP_JAVA_OPTS = "mapreduce.map.java.opts"; @@ -125,10 +126,11 @@ public class JavaActionExecutor extends ActionExecutor { protected XLog LOG = XLog.getLog(getClass()); private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])"); private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir="; - public static final String CONF_HADOOP_YARN_UBER_MODE = "oozie.action.launcher." + HADOOP_YARN_UBER_MODE; + public static final String CONF_HADOOP_YARN_UBER_MODE = OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_UBER_MODE; public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader"; public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first"; public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip"; + private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator(); public XConfiguration workflowConf = null; @@ -151,7 +153,7 @@ public class JavaActionExecutor extends ActionExecutor { public static List<Class> getCommonLauncherClasses() { List<Class> classes = new ArrayList<Class>(); classes.add(LauncherMapper.class); - classes.add(OozieLauncherInputFormat.class); + classes.add(launcherInputFormatClassLocator.locateOrGet()); classes.add(OozieLauncherOutputFormat.class); classes.add(OozieLauncherOutputCommitter.class); classes.add(LauncherMainHadoopUtils.class); @@ -297,12 +299,12 @@ public class JavaActionExecutor extends ActionExecutor { // 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable // 3. oozie.action.launcher.mapreduce.job.ubertask.enable if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) { - if (ConfigurationService.get("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE).length() > 0) { - if (ConfigurationService.getBoolean("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE)) { + if (ConfigurationService.get(getActionTypeLauncherPrefix() + HADOOP_YARN_UBER_MODE).length() > 0) { + if (ConfigurationService.getBoolean(getActionTypeLauncherPrefix() + HADOOP_YARN_UBER_MODE)) { launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true); } } else { - if (ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_UBER_MODE)) { + if (ConfigurationService.getBoolean(OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_UBER_MODE)) { launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true); } } @@ -312,7 +314,7 @@ public class JavaActionExecutor extends ActionExecutor { void injectLauncherTimelineServiceEnabled(Configuration launcherConf, Configuration actionConf) { // Getting delegation token for ATS. If tez-site.xml is present in distributed cache, turn on timeline service. if (actionConf.get("oozie.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED) == null - && ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED)) { + && ConfigurationService.getBoolean(OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_TIMELINE_SERVICE_ENABLED)) { String cacheFiles = launcherConf.get("mapred.cache.files"); if (cacheFiles != null && cacheFiles.contains("tez-site.xml")) { launcherConf.setBoolean(HADOOP_YARN_TIMELINE_SERVICE_ENABLED, true); @@ -1778,4 +1780,8 @@ public class JavaActionExecutor extends ActionExecutor { return workflowConf; } + + private String getActionTypeLauncherPrefix() { + return "oozie.action." + getType() + ".launcher."; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/4cb29f81/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java index 9609fdc..72ed2f1 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java @@ -57,6 +57,8 @@ public class LauncherMapperHelper { public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag"; + private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator(); + public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) throws HadoopAccessorException, IOException { String jobId = null; @@ -160,7 +162,7 @@ public class LauncherMapperHelper { IOUtils.closeSafely(os); } - launcherConf.setInputFormat(OozieLauncherInputFormat.class); + launcherConf.setInputFormat(launcherInputFormatClassLocator.locateOrGet()); launcherConf.setOutputFormat(OozieLauncherOutputFormat.class); launcherConf.setOutputCommitter(OozieLauncherOutputCommitter.class); } http://git-wip-us.apache.org/repos/asf/oozie/blob/4cb29f81/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index e7a48a0..076401d 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -1860,6 +1860,14 @@ will be the requeue interval for the actions which are waiting for a long time w </property> <property> + <name>oozie.action.launcher.mapreduce.input.format.class</name> + <value>org.apache.oozie.action.hadoop.OozieLauncherInputFormat</value> + <description> + Make the Launcher Mapper map-only job's InputFormat class pluggable in order to provide alternative implementations. + </description> + </property> + + <property> <name>oozie.action.spark.setup.hadoop.conf.dir</name> <value>false</value> <description> http://git-wip-us.apache.org/repos/asf/oozie/blob/4cb29f81/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index cb141cb..ee65cd1 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.4.0 release (trunk - unreleased) +OOZIE-2874 Make the Launcher Mapper map-only job's InputFormat class pluggable (andras.piros via gezapeti) OOZIE-2751 LocalOozieClient is missing methods from OozieClient (abhishekbafna via rkanter) OOZIE-2870 non working examples in oozie documentation coordinator spec (andras.piros via pbacsko) OOZIE-2827 amend More directly view of the coordinatorâs history from perspective of workflow action. (Alonzo Zhou via pbacsko)