http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 06ae5fd..f4c1127 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -25,21 +25,22 @@ import java.net.ConnectException; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; -import java.security.PrivilegedExceptionAction; +import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Properties; import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import org.apache.commons.io.IOUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; import org.apache.hadoop.conf.Configuration; @@ -48,23 +49,41 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.AccessControlException; -import org.apache.oozie.hadoop.utils.HadoopShims; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.TaskLog; +import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; +import org.apache.hadoop.mapreduce.v2.util.MRApps; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Apps; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.command.coord.CoordActionStartXCommand; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HadoopAccessorException; @@ -72,8 +91,8 @@ import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.Services; import org.apache.oozie.service.ShareLibService; import org.apache.oozie.service.URIHandlerService; -import org.apache.oozie.service.UserGroupInformationService; import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.util.ClasspathUtils; import org.apache.oozie.util.ELEvaluationException; import org.apache.oozie.util.ELEvaluator; import org.apache.oozie.util.JobUtils; @@ -86,18 +105,22 @@ import org.jdom.Element; import org.jdom.JDOMException; import org.jdom.Namespace; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Closeables; + public class JavaActionExecutor extends ActionExecutor { - protected static final String HADOOP_USER = "user.name"; - public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; - public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address"; + public static final String RUNNING = "RUNNING"; + public static final String SUCCEEDED = "SUCCEEDED"; + public static final String KILLED = "KILLED"; + public static final String FAILED = "FAILED"; + public static final String FAILED_KILLED = "FAILED/KILLED"; public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; public static final String HADOOP_NAME_NODE = "fs.default.name"; - private static final String HADOOP_JOB_NAME = "mapred.job.name"; public static final String OOZIE_COMMON_LIBDIR = "oozie"; - private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>(); - public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size"; + + public static final String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size"; public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job"; public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled"; @@ -111,34 +134,32 @@ public class JavaActionExecutor extends ActionExecutor { public static final String HADOOP_REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts"; public static final String HADOOP_CHILD_JAVA_ENV = "mapred.child.env"; public static final String HADOOP_MAP_JAVA_ENV = "mapreduce.map.env"; + public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader"; + public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first"; + public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip"; public static final String YARN_AM_RESOURCE_MB = "yarn.app.mapreduce.am.resource.mb"; public static final String YARN_AM_COMMAND_OPTS = "yarn.app.mapreduce.am.command-opts"; public static final String YARN_AM_ENV = "yarn.app.mapreduce.am.env"; - private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain"; public static final int YARN_MEMORY_MB_MIN = 512; + + private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain"; + private static final String HADOOP_JOB_NAME = "mapred.job.name"; + private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>(); + private static int maxActionOutputLen; private static int maxExternalStatsSize; private static int maxFSGlobMax; - private static final String SUCCEEDED = "SUCCEEDED"; - private static final String KILLED = "KILLED"; - private static final String FAILED = "FAILED"; - private static final String FAILED_KILLED = "FAILED/KILLED"; + + protected static final String HADOOP_USER = "user.name"; + protected XLog LOG = XLog.getLog(getClass()); - private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])"); private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir="; - public static final String CONF_HADOOP_YARN_UBER_MODE = OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_UBER_MODE; - public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader"; - public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first"; - public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip"; - private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator(); public XConfiguration workflowConf = null; static { DISALLOWED_PROPERTIES.add(HADOOP_USER); - DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER); DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE); - DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2); DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM); } @@ -150,20 +171,17 @@ public class JavaActionExecutor extends ActionExecutor { super(type); } - public static List<Class> getCommonLauncherClasses() { - List<Class> classes = new ArrayList<Class>(); - classes.add(LauncherMapper.class); - classes.add(launcherInputFormatClassLocator.locateOrGet()); - classes.add(OozieLauncherOutputFormat.class); - classes.add(OozieLauncherOutputCommitter.class); - classes.add(LauncherMainHadoopUtils.class); - classes.add(HadoopShims.class); + public static List<Class<?>> getCommonLauncherClasses() { + List<Class<?>> classes = new ArrayList<Class<?>>(); + classes.add(LauncherMain.class); classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher()); + classes.add(LauncherAM.class); + classes.add(LauncherAMCallbackNotifier.class); return classes; } - public List<Class> getLauncherClasses() { - List<Class> classes = new ArrayList<Class>(); + public List<Class<?>> getLauncherClasses() { + List<Class<?>> classes = new ArrayList<Class<?>>(); try { classes.add(Class.forName(JAVA_MAIN_CLASS_NAME)); } @@ -176,7 +194,7 @@ public class JavaActionExecutor extends ActionExecutor { @Override public void initActionType() { super.initActionType(); - maxActionOutputLen = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA); + maxActionOutputLen = ConfigurationService.getInt(LauncherAM.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA); //Get the limit for the maximum allowed size of action stats maxExternalStatsSize = ConfigurationService.getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE); maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize; @@ -217,31 +235,32 @@ public class JavaActionExecutor extends ActionExecutor { } } - public JobConf createBaseHadoopConf(Context context, Element actionXml) { + public Configuration createBaseHadoopConf(Context context, Element actionXml) { return createBaseHadoopConf(context, actionXml, true); } - protected JobConf createBaseHadoopConf(Context context, Element actionXml, boolean loadResources) { + protected Configuration createBaseHadoopConf(Context context, Element actionXml, boolean loadResources) { + Namespace ns = actionXml.getNamespace(); String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim(); String nameNode = actionXml.getChild("name-node", ns).getTextTrim(); - JobConf conf = null; + Configuration conf = null; if (loadResources) { conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); } else { - conf = new JobConf(false); + conf = new Configuration(false); } + conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER)); - conf.set(HADOOP_JOB_TRACKER, jobTracker); - conf.set(HADOOP_JOB_TRACKER_2, jobTracker); conf.set(HADOOP_YARN_RM, jobTracker); conf.set(HADOOP_NAME_NODE, nameNode); conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true"); + return conf; } - protected JobConf loadHadoopDefaultResources(Context context, Element actionXml) { + protected Configuration loadHadoopDefaultResources(Context context, Element actionXml) { return createBaseHadoopConf(context, actionXml); } @@ -266,7 +285,7 @@ public class JavaActionExecutor extends ActionExecutor { XConfiguration launcherConf = new XConfiguration(); // Inject action defaults for launcher HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); - XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType()); + XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_YARN_RM), getType()); injectLauncherProperties(actionDefaultConf, launcherConf); // Inject <job-xml> and <configuration> for launcher try { @@ -276,15 +295,8 @@ public class JavaActionExecutor extends ActionExecutor { } catch (URISyntaxException ex) { throw convertException(ex); } - // Inject use uber mode for launcher - injectLauncherUseUberMode(launcherConf); XConfiguration.copy(launcherConf, conf); checkForDisallowedProps(launcherConf, "launcher configuration"); - // Inject config-class for launcher to use for action - Element e = actionXml.getChild("config-class", ns); - if (e != null) { - conf.set(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim()); - } return conf; } catch (IOException ex) { @@ -292,25 +304,6 @@ public class JavaActionExecutor extends ActionExecutor { } } - void injectLauncherUseUberMode(Configuration launcherConf) { - // Set Uber Mode for the launcher (YARN only, ignored by MR1) - // Priority: - // 1. action's <configuration> - // 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable - // 3. oozie.action.launcher.mapreduce.job.ubertask.enable - if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) { - if (ConfigurationService.get(getActionTypeLauncherPrefix() + HADOOP_YARN_UBER_MODE).length() > 0) { - if (ConfigurationService.getBoolean(getActionTypeLauncherPrefix() + HADOOP_YARN_UBER_MODE)) { - launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true); - } - } else { - if (ConfigurationService.getBoolean(OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_UBER_MODE)) { - launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true); - } - } - } - } - void injectLauncherTimelineServiceEnabled(Configuration launcherConf, Configuration actionConf) { // Getting delegation token for ATS. If tez-site.xml is present in distributed cache, turn on timeline service. if (actionConf.get("oozie.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED) == null @@ -322,151 +315,6 @@ public class JavaActionExecutor extends ActionExecutor { } } - void updateConfForUberMode(Configuration launcherConf) { - - // child.env - boolean hasConflictEnv = false; - String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV); - if (launcherMapEnv == null) { - launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV); - } - String amEnv = launcherConf.get(YARN_AM_ENV); - StringBuffer envStr = new StringBuffer(); - HashMap<String, List<String>> amEnvMap = null; - HashMap<String, List<String>> launcherMapEnvMap = null; - if (amEnv != null) { - envStr.append(amEnv); - amEnvMap = populateEnvMap(amEnv); - } - if (launcherMapEnv != null) { - launcherMapEnvMap = populateEnvMap(launcherMapEnv); - if (amEnvMap != null) { - Iterator<String> envKeyItr = launcherMapEnvMap.keySet().iterator(); - while (envKeyItr.hasNext()) { - String envKey = envKeyItr.next(); - if (amEnvMap.containsKey(envKey)) { - List<String> amValList = amEnvMap.get(envKey); - List<String> launcherValList = launcherMapEnvMap.get(envKey); - Iterator<String> valItr = launcherValList.iterator(); - while (valItr.hasNext()) { - String val = valItr.next(); - if (!amValList.contains(val)) { - hasConflictEnv = true; - break; - } - else { - valItr.remove(); - } - } - if (launcherValList.isEmpty()) { - envKeyItr.remove(); - } - } - } - } - } - if (hasConflictEnv) { - launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, false); - } - else { - if (launcherMapEnvMap != null) { - for (String key : launcherMapEnvMap.keySet()) { - List<String> launcherValList = launcherMapEnvMap.get(key); - for (String val : launcherValList) { - if (envStr.length() > 0) { - envStr.append(","); - } - envStr.append(key).append("=").append(val); - } - } - } - - launcherConf.set(YARN_AM_ENV, envStr.toString()); - - // memory.mb - int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 1536); - int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536); - // YARN_MEMORY_MB_MIN to provide buffer. - // suppose launcher map aggressively use high memory, need some - // headroom for AM - int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + YARN_MEMORY_MB_MIN; - // limit to 4096 in case of 32 bit - if (launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096) { - memoryMB = 4096; - } - launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB); - - // We already made mapred.child.java.opts and - // mapreduce.map.java.opts equal, so just start with one of them - String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, ""); - String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS); - StringBuilder optsStr = new StringBuilder(); - int heapSizeForMap = extractHeapSizeMB(launcherMapOpts); - int heapSizeForAm = extractHeapSizeMB(amChildOpts); - int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + YARN_MEMORY_MB_MIN; - // limit to 3584 in case of 32 bit - if (heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) { - heapSize = 3584; - } - if (amChildOpts != null) { - optsStr.append(amChildOpts); - } - optsStr.append(" ").append(launcherMapOpts.trim()); - if (heapSize > 0) { - // append calculated total heap size to the end - optsStr.append(" ").append("-Xmx").append(heapSize).append("m"); - } - launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim()); - } - } - - void updateConfForJavaTmpDir(Configuration conf) { - String amChildOpts = conf.get(YARN_AM_COMMAND_OPTS); - String oozieJavaTmpDirSetting = "-Djava.io.tmpdir=./tmp"; - if (amChildOpts != null && !amChildOpts.contains(JAVA_TMP_DIR_SETTINGS)) { - conf.set(YARN_AM_COMMAND_OPTS, amChildOpts + " " + oozieJavaTmpDirSetting); - } - } - - private HashMap<String, List<String>> populateEnvMap(String input) { - HashMap<String, List<String>> envMaps = new HashMap<String, List<String>>(); - String[] envEntries = input.split(","); - for (String envEntry : envEntries) { - String[] envKeyVal = envEntry.split("="); - String envKey = envKeyVal[0].trim(); - List<String> valList = envMaps.get(envKey); - if (valList == null) { - valList = new ArrayList<String>(); - } - valList.add(envKeyVal[1].trim()); - envMaps.put(envKey, valList); - } - return envMaps; - } - - public int extractHeapSizeMB(String input) { - int ret = 0; - if(input == null || input.equals("")) - return ret; - Matcher m = heapPattern.matcher(input); - String heapStr = null; - String heapNum = null; - // Grabs the last match which takes effect (in case that multiple Xmx options specified) - while (m.find()) { - heapStr = m.group(1); - heapNum = m.group(2); - } - if (heapStr != null) { - // when Xmx specified in Gigabyte - if(heapStr.endsWith("g") || heapStr.endsWith("G")) { - ret = Integer.parseInt(heapNum) * 1024; - } else { - ret = Integer.parseInt(heapNum); - } - } - return ret; - } - public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf) throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException { parseJobXmlAndConfiguration(context, element, appPath, conf, false); @@ -475,6 +323,7 @@ public class JavaActionExecutor extends ActionExecutor { public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf, boolean isLauncher) throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException { Namespace ns = element.getNamespace(); + @SuppressWarnings("unchecked") Iterator<Element> it = element.getChildren("job-xml", ns).iterator(); HashMap<String, FileSystem> filesystemsMap = new HashMap<String, FileSystem>(); HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); @@ -535,8 +384,8 @@ public class JavaActionExecutor extends ActionExecutor { throws ActionExecutorException { try { HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); - XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType()); - XConfiguration.copy(actionDefaults, actionConf); + XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_YARN_RM), getType()); + XConfiguration.injectDefaults(actionDefaults, actionConf); has.checkSupportedFilesystem(appPath.toUri()); // Set the Java Main Class for the Java action to give to the Java launcher @@ -546,7 +395,6 @@ public class JavaActionExecutor extends ActionExecutor { // set cancel.delegation.token in actionConf that child job doesn't cancel delegation token actionConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false); - updateConfForJavaTmpDir(actionConf); setRootLoggerLevel(actionConf); return actionConf; } @@ -634,8 +482,8 @@ public class JavaActionExecutor extends ActionExecutor { } catch (Exception ex) { LOG.debug( - "Errors when add to DistributedCache. Path=" + uri.toString() + ", archive=" + archive + ", conf=" - + XmlUtils.prettyPrint(conf).toString()); + "Errors when add to DistributedCache. Path=" + Objects.toString(uri, "<null>") + ", archive=" + + archive + ", conf=" + XmlUtils.prettyPrint(conf).toString()); throw convertException(ex); } } @@ -758,7 +606,7 @@ public class JavaActionExecutor extends ActionExecutor { if (shareLibService != null) { try { List<Path> listOfPaths = shareLibService.getSystemLibJars(JavaActionExecutor.OOZIE_COMMON_LIBDIR); - if (listOfPaths == null || listOfPaths.isEmpty()) { + if (listOfPaths.isEmpty()) { throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "EJ001", "Could not locate Oozie sharelib"); } @@ -768,7 +616,7 @@ public class JavaActionExecutor extends ActionExecutor { DistributedCache.createSymlink(conf); } listOfPaths = shareLibService.getSystemLibJars(getType()); - if (listOfPaths != null) { + if (!listOfPaths.isEmpty()) { for (Path actionLibPath : listOfPaths) { JobUtils.addFileToClassPath(actionLibPath, conf, fs); DistributedCache.createSymlink(conf); @@ -885,7 +733,7 @@ public class JavaActionExecutor extends ActionExecutor { protected String getLauncherMain(Configuration launcherConf, Element actionXml) { - return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName()); + return launcherConf.get(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName()); } private void setJavaMain(Configuration actionConf, Element actionXml) { @@ -907,8 +755,8 @@ public class JavaActionExecutor extends ActionExecutor { } @SuppressWarnings("unchecked") - JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf) - throws ActionExecutorException { + Configuration createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, + Configuration actionConf) throws ActionExecutorException { try { // app path could be a file @@ -918,7 +766,7 @@ public class JavaActionExecutor extends ActionExecutor { } // launcher job configuration - JobConf launcherJobConf = createBaseHadoopConf(context, actionXml); + Configuration launcherJobConf = createBaseHadoopConf(context, actionXml); // cancel delegation token on a launcher job which stays alive till child job(s) finishes // otherwise (in mapred action), doesn't cancel not to disturb running child job launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true); @@ -940,7 +788,7 @@ public class JavaActionExecutor extends ActionExecutor { launcherTime = context.getWorkflow().getCreatedTime().getTime(); } String actionYarnTag = getActionYarnTag(getWorkflowConf(context), context.getWorkflow(), action); - LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, actionYarnTag, launcherTime); + LauncherHelper.setupYarnRestartHandling(launcherJobConf, actionConf, actionYarnTag, launcherTime); } else { LOG.info(MessageFormat.format("{0} is set to false, not setting YARN restart properties", @@ -953,15 +801,6 @@ public class JavaActionExecutor extends ActionExecutor { } setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf); - String jobName = launcherJobConf.get(HADOOP_JOB_NAME); - if (jobName == null || jobName.isEmpty()) { - jobName = XLog.format( - "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), - context.getWorkflow().getAppName(), action.getName(), - context.getWorkflow().getId()); - launcherJobConf.setJobName(jobName); - } - // Inject Oozie job information if enabled. injectJobInfo(launcherJobConf, actionConf, context, action); @@ -981,23 +820,22 @@ public class JavaActionExecutor extends ActionExecutor { prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim(); } } - LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, + LauncherHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, prepareXML); // Set the launcher Main Class - LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); - LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf); - - LauncherMapperHelper.setupMaxOutputData(launcherJobConf, getMaxOutputData(actionConf)); - LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize); - LauncherMapperHelper.setupMaxFSGlob(launcherJobConf, maxFSGlobMax); + LauncherHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); + LauncherHelper.setupLauncherURIHandlerConf(launcherJobConf); + LauncherHelper.setupMaxOutputData(launcherJobConf, getMaxOutputData(actionConf)); + LauncherHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize); + LauncherHelper.setupMaxFSGlob(launcherJobConf, maxFSGlobMax); List<Element> list = actionXml.getChildren("arg", ns); String[] args = new String[list.size()]; for (int i = 0; i < list.size(); i++) { args[i] = list.get(i).getTextTrim(); } - LauncherMapperHelper.setupMainArguments(launcherJobConf, args); + LauncherHelper.setupMainArguments(launcherJobConf, args); // backward compatibility flag - see OOZIE-2872 if (ConfigurationService.getBoolean(LauncherMapper.CONF_OOZIE_NULL_ARGS_ALLOWED)) { launcherJobConf.setBoolean(LauncherMapper.CONF_OOZIE_NULL_ARGS_ALLOWED, true); @@ -1022,16 +860,6 @@ public class JavaActionExecutor extends ActionExecutor { launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim()); launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim()); - // setting for uber mode - if (launcherJobConf.getBoolean(HADOOP_YARN_UBER_MODE, false)) { - if (checkPropertiesToDisableUber(launcherJobConf)) { - launcherJobConf.setBoolean(HADOOP_YARN_UBER_MODE, false); - } - else { - updateConfForUberMode(launcherJobConf); - } - } - updateConfForJavaTmpDir(launcherJobConf); injectLauncherTimelineServiceEnabled(launcherJobConf, actionConf); // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs) @@ -1055,23 +883,9 @@ public class JavaActionExecutor extends ActionExecutor { return maxActionOutputLen; } - private boolean checkPropertiesToDisableUber(Configuration launcherConf) { - boolean disable = false; - if (launcherConf.getBoolean(HADOOP_JOB_CLASSLOADER, false)) { - disable = true; - } - else if (launcherConf.getBoolean(HADOOP_USER_CLASSPATH_FIRST, false)) { - disable = true; - } - return disable; - } - protected void injectCallback(Context context, Configuration conf) { - String callback = context.getCallbackUrl("$jobStatus"); - if (conf.get("job.end.notification.url") != null) { - LOG.warn("Overriding the action job end notification URI"); - } - conf.set("job.end.notification.url", callback); + String callback = context.getCallbackUrl(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, callback); } void injectActionCallback(Context context, Configuration actionConf) { @@ -1082,7 +896,7 @@ public class JavaActionExecutor extends ActionExecutor { injectCallback(context, launcherConf); } - private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) { + private void actionConfToLauncherConf(Configuration actionConf, Configuration launcherConf) { for (String name : SPECIAL_PROPERTIES) { if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) { launcherConf.set(name, actionConf.get(name)); @@ -1090,9 +904,8 @@ public class JavaActionExecutor extends ActionExecutor { } } - public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException { - JobClient jobClient = null; - boolean exception = false; + public void submitLauncher(FileSystem actionFs, final Context context, WorkflowAction action) throws ActionExecutorException { + YarnClient yarnClient = null; try { Path appPathRoot = new Path(context.getWorkflow().getAppPath()); @@ -1109,14 +922,6 @@ public class JavaActionExecutor extends ActionExecutor { LOG.debug("Setting LibFilesArchives "); setLibFilesArchives(context, actionXml, appPathRoot, actionConf); - String jobName = actionConf.get(HADOOP_JOB_NAME); - if (jobName == null || jobName.isEmpty()) { - jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", - getType(), context.getWorkflow().getAppName(), - action.getName(), context.getWorkflow().getId()); - actionConf.set(HADOOP_JOB_NAME, jobName); - } - injectActionCallback(context, actionConf); if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) { @@ -1130,15 +935,17 @@ public class JavaActionExecutor extends ActionExecutor { } // Setting the credential properties in launcher conf - JobConf credentialsConf = null; + Configuration credentialsConf = null; + HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context, action, actionConf); + Credentials credentials = null; if (credentialsProperties != null) { - + credentials = new Credentials(); // Adding if action need to set more credential tokens - credentialsConf = new JobConf(false); + credentialsConf = new Configuration(false); XConfiguration.copy(actionConf, credentialsConf); - setCredentialTokens(credentialsConf, context, action, credentialsProperties); + setCredentialTokens(credentials, credentialsConf, context, action, credentialsProperties); // insert conf to action conf from credentialsConf for (Entry<String, String> entry : credentialsConf) { @@ -1147,49 +954,56 @@ public class JavaActionExecutor extends ActionExecutor { } } } + Configuration launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); - JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); - - LOG.debug("Creating Job Client for action " + action.getId()); - jobClient = createJobClient(context, launcherJobConf); - String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context + String consoleUrl; + String launcherId = LauncherHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context .getRecoveryId()); boolean alreadyRunning = launcherId != null; - RunningJob runningJob; // if user-retry is on, always submit new launcher boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry(); + LOG.debug("Creating yarnClient for action {0}", action.getId()); + yarnClient = createYarnClient(context, launcherJobConf); if (alreadyRunning && !isUserRetry) { - runningJob = jobClient.getJob(JobID.forName(launcherId)); - if (runningJob == null) { - String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER); + try { + ApplicationId appId = ConverterUtils.toApplicationId(launcherId); + ApplicationReport report = yarnClient.getApplicationReport(appId); + consoleUrl = report.getTrackingUrl(); + } catch (RemoteException e) { + // caught when the application id does not exist + LOG.error("Got RemoteException from YARN", e); + String jobTracker = launcherJobConf.get(HADOOP_YARN_RM); throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker); } } else { - LOG.debug("Submitting the job through Job Client for action " + action.getId()); - - // setting up propagation of the delegation token. - Services.get().get(HadoopAccessorService.class).addRMDelegationToken(jobClient, launcherJobConf); + // TODO: OYA: do we actually need an MR token? IIRC, it's issued by the JHS +// // setting up propagation of the delegation token. +// Token<DelegationTokenIdentifier> mrdt = null; +// HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); +// mrdt = jobClient.getDelegationToken(has +// .getMRDelegationTokenRenewer(launcherJobConf)); +// launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt); // insert credentials tokens to launcher job conf if needed - if (needInjectCredentials() && credentialsConf != null) { - for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) { + if (credentialsConf != null) { + for (Token<? extends TokenIdentifier> tk :credentials.getAllTokens()) { Text fauxAlias = new Text(tk.getKind() + "_" + tk.getService()); LOG.debug("ADDING TOKEN: " + fauxAlias); - launcherJobConf.getCredentials().addToken(fauxAlias, tk); + credentials.addToken(fauxAlias, tk); } - if (credentialsConf.getCredentials().numberOfSecretKeys() > 0) { + if (credentials.numberOfSecretKeys() > 0) { for (Entry<String, CredentialsProperties> entry : credentialsProperties.entrySet()) { CredentialsProperties credProps = entry.getValue(); if (credProps != null) { Text credName = new Text(credProps.getName()); - byte[] secKey = credentialsConf.getCredentials().getSecretKey(credName); + byte[] secKey = credentials.getSecretKey(credName); if (secKey != null) { LOG.debug("ADDING CREDENTIAL: " + credProps.getName()); - launcherJobConf.getCredentials().addSecretKey(credName, secKey); + credentials.addSecretKey(credName, secKey); } } } @@ -1198,55 +1012,129 @@ public class JavaActionExecutor extends ActionExecutor { else { LOG.info("No need to inject credentials."); } - runningJob = jobClient.submitJob(launcherJobConf); - if (runningJob == null) { - throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", - "Error submitting launcher for action [{0}]", action.getId()); - } - launcherId = runningJob.getID().toString(); - LOG.debug("After submission get the launcherId " + launcherId); + + String user = context.getWorkflow().getUser(); + + YarnClientApplication newApp = yarnClient.createApplication(); + ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId(); + ApplicationSubmissionContext appContext = + createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf, action.getName(), + credentials); + yarnClient.submitApplication(appContext); + + launcherId = appId.toString(); + LOG.debug("After submission get the launcherId [{0}]", launcherId); + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + consoleUrl = appReport.getTrackingUrl(); } - String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER); - String consoleUrl = runningJob.getTrackingURL(); + String jobTracker = launcherJobConf.get(HADOOP_YARN_RM); context.setStartData(launcherId, jobTracker, consoleUrl); } catch (Exception ex) { - exception = true; throw convertException(ex); } finally { - if (jobClient != null) { - try { - jobClient.close(); - } - catch (Exception e) { - if (exception) { - LOG.error("JobClient error: ", e); - } - else { - throw convertException(e); - } - } + if (yarnClient != null) { + Closeables.closeQuietly(yarnClient); } } } - private boolean needInjectCredentials() { - boolean methodExists = true; - Class klass; - try { - klass = Class.forName("org.apache.hadoop.mapred.JobConf"); - klass.getMethod("getCredentials"); - } - catch (ClassNotFoundException ex) { - methodExists = false; - } - catch (NoSuchMethodException ex) { - methodExists = false; + private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, Configuration launcherJobConf, + String user, Context context, Configuration actionConf, String actionName, + Credentials credentials) + throws IOException, HadoopAccessorException, URISyntaxException { + + ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); + + String jobName = XLog.format( + "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), + context.getWorkflow().getAppName(), actionName, + context.getWorkflow().getId()); + + appContext.setApplicationId(appId); + appContext.setApplicationName(jobName); + appContext.setApplicationType("Oozie Launcher"); + Priority pri = Records.newRecord(Priority.class); + int priority = 0; // TODO: OYA: Add a constant or a config + pri.setPriority(priority); + appContext.setPriority(pri); + appContext.setQueue("default"); // TODO: will be possible to set in <launcher> + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); + + // Set the resources to localize + Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); + ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(launcherJobConf); + MRApps.setupDistributedCache(launcherJobConf, localResources); + // Add the Launcher and Action configs as Resources + HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); + LocalResource launcherJobConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.LAUNCHER_JOB_CONF_XML, user, + launcherJobConf, context.getAppFileSystem().getUri(), context.getActionDir()); + localResources.put(LauncherAM.LAUNCHER_JOB_CONF_XML, launcherJobConfLR); + LocalResource actionConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.ACTION_CONF_XML, user, actionConf, + context.getAppFileSystem().getUri(), context.getActionDir()); + localResources.put(LauncherAM.ACTION_CONF_XML, actionConfLR); + amContainer.setLocalResources(localResources); + + // Set the environment variables + Map<String, String> env = new HashMap<String, String>(); + // This adds the Hadoop jars to the classpath in the Launcher JVM + ClasspathUtils.setupClasspath(env, launcherJobConf); + + if (needToAddMapReduceToClassPath()) { + ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf); + } + + addActionSpecificEnvVars(env); + amContainer.setEnvironment(Collections.unmodifiableMap(env)); + + // Set the command + List<String> vargs = new ArrayList<String>(6); + vargs.add(Apps.crossPlatformify(ApplicationConstants.Environment.JAVA_HOME.toString()) + + "/bin/java"); + + vargs.add("-Dlog4j.configuration=container-log4j.properties"); + vargs.add("-Dlog4j.debug=true"); + vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR); + vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 0); + vargs.add("-Dhadoop.root.logger=INFO,CLA"); + vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG); + vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser()); + + Path amTmpDir = new Path(Apps.crossPlatformify(ApplicationConstants.Environment.PWD.toString()), + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR); + vargs.add("-Djava.io.tmpdir=" + amTmpDir); + + vargs.add(LauncherAM.class.getCanonicalName()); + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + + Path.SEPARATOR + ApplicationConstants.STDOUT); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + + Path.SEPARATOR + ApplicationConstants.STDERR); + StringBuilder mergedCommand = new StringBuilder(); + for (CharSequence str : vargs) { + mergedCommand.append(str).append(" "); + } + + List<String> vargsFinal = ImmutableList.of(mergedCommand.toString()); + LOG.debug("Command to launch container for ApplicationMaster is: {0}", mergedCommand); + amContainer.setCommands(vargsFinal); + appContext.setAMContainerSpec(amContainer); + + // Set tokens + if (credentials != null) { + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + amContainer.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); } - return methodExists; + // Set Resources + // TODO: OYA: make resources allocated for the AM configurable and choose good defaults (memory MB, vcores) + Resource resource = Resource.newInstance(2048, 1); + appContext.setResource(resource); + appContext.setCancelTokensWhenComplete(true); + + return appContext; } protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context, @@ -1258,15 +1146,16 @@ public class JavaActionExecutor extends ActionExecutor { if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) || !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) { credPropertiesMap = getActionCredentialsProperties(context, action); - if (credPropertiesMap != null) { - for (String key : credPropertiesMap.keySet()) { - CredentialsProperties prop = credPropertiesMap.get(key); - if (prop != null) { + if (!credPropertiesMap.isEmpty()) { + for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { + if (entry.getValue() != null) { + CredentialsProperties prop = entry.getValue(); LOG.debug("Credential Properties set for action : " + action.getId()); - for (String property : prop.getProperties().keySet()) { - actionConf.set(property, prop.getProperties().get(property)); - LOG.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) - + "'"); + for (Entry<String, String> propEntry : prop.getProperties().entrySet()) { + String key = propEntry.getKey(); + String value = propEntry.getValue(); + actionConf.set(key, value); + LOG.debug("property : '" + key + "', value : '" + value + "'"); } } } @@ -1285,20 +1174,20 @@ public class JavaActionExecutor extends ActionExecutor { return credPropertiesMap; } - protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action, + protected void setCredentialTokens(Credentials credentials, Configuration jobconf, Context context, WorkflowAction action, HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception { if (context != null && action != null && credPropertiesMap != null) { // Make sure we're logged into Kerberos; if not, or near expiration, it will relogin - CredentialsProvider.ensureKerberosLogin(); + CredentialsProviderFactory.ensureKerberosLogin(); for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { String credName = entry.getKey(); CredentialsProperties credProps = entry.getValue(); if (credProps != null) { - CredentialsProvider credProvider = new CredentialsProvider(credProps.getType()); - Credentials credentialObject = credProvider.createCredentialObject(); - if (credentialObject != null) { - credentialObject.addtoJobConf(jobconf, credProps, context); + CredentialsProvider tokenProvider = CredentialsProviderFactory.getInstance() + .createCredentialsProvider(credProps.getType()); + if (tokenProvider != null) { + tokenProvider.updateCredentials(credentials, jobconf, credProps, context); LOG.debug("Retrieved Credential '" + credName + "' for action " + action.getId()); } else { @@ -1310,7 +1199,6 @@ public class JavaActionExecutor extends ActionExecutor { } } } - } protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context, @@ -1424,19 +1312,22 @@ public class JavaActionExecutor extends ActionExecutor { * @return JobClient * @throws HadoopAccessorException */ - protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException { + protected JobClient createJobClient(Context context, Configuration jobConf) throws HadoopAccessorException { String user = context.getWorkflow().getUser(); - String group = context.getWorkflow().getGroup(); return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); } - protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{ - String externalId = action.getExternalId(); - RunningJob runningJob = null; - if (externalId != null) { - runningJob = jobClient.getJob(JobID.forName(externalId)); - } - return runningJob; + /** + * Create yarn client object + * + * @param context + * @param jobConf + * @return YarnClient + * @throws HadoopAccessorException + */ + protected YarnClient createYarnClient(Context context, Configuration jobConf) throws HadoopAccessorException { + String user = context.getWorkflow().getUser(); + return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf); } /** @@ -1448,142 +1339,141 @@ public class JavaActionExecutor extends ActionExecutor { return action.getExternalId(); } + /** + * If returns true, it means that we have to add Hadoop MR jars to the classpath. + * Subclasses should override this method if necessary. By default we don't add + * MR jars to the classpath. + * @return false by default + */ + protected boolean needToAddMapReduceToClassPath() { + return false; + } + + /** + * Adds action-specific environment variables. Default implementation is no-op. + * Subclasses should override this method if necessary. + * + */ + protected void addActionSpecificEnvVars(Map<String, String> env) { + // nop + } + @Override public void check(Context context, WorkflowAction action) throws ActionExecutorException { - JobClient jobClient = null; - boolean exception = false; + boolean fallback = false; + LOG = XLog.resetPrefix(LOG); LogUtils.setLogInfo(action); + YarnClient yarnClient = null; try { Element actionXml = XmlUtils.parseXml(action.getConf()); + Configuration jobConf = createBaseHadoopConf(context, actionXml); FileSystem actionFs = context.getAppFileSystem(); - JobConf jobConf = createBaseHadoopConf(context, actionXml); - jobClient = createJobClient(context, jobConf); - RunningJob runningJob = getRunningJob(context, action, jobClient); - if (runningJob == null) { - context.setExecutionData(FAILED, null); - throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", - "Could not lookup launched hadoop Job ID [{0}] which was associated with " + - " action [{1}]. Failing this action!", getActualExternalId(action), action.getId()); - } - if (runningJob.isComplete()) { + yarnClient = createYarnClient(context, jobConf); + FinalApplicationStatus appStatus = null; + try { + ApplicationReport appReport = + yarnClient.getApplicationReport(ConverterUtils.toApplicationId(action.getExternalId())); + YarnApplicationState appState = appReport.getYarnApplicationState(); + if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED + || appState == YarnApplicationState.KILLED) { + appStatus = appReport.getFinalApplicationStatus(); + } + + } catch (Exception ye) { + LOG.warn("Exception occurred while checking Launcher AM status; will try checking action data file instead ", ye); + // Fallback to action data file if we can't find the Launcher AM (maybe it got purged) + fallback = true; + } + if (appStatus != null || fallback) { Path actionDir = context.getActionDir(); - String newId = null; // load sequence file into object - Map<String, String> actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf); - if (actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID)) { - newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID); - String launcherId = action.getExternalId(); - runningJob = jobClient.getJob(JobID.forName(newId)); - if (runningJob == null) { - context.setExternalStatus(FAILED); + Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf); + if (fallback) { + String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS); + if (finalStatus != null) { + appStatus = FinalApplicationStatus.valueOf(finalStatus); + } else { + context.setExecutionData(FAILED, null); throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", - "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId, - action.getId()); + "Unknown hadoop job [{0}] associated with action [{1}] and couldn't determine status from" + + " action data. Failing this action!", action.getExternalId(), action.getId()); } - context.setExternalChildIDs(newId); - LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId, - newId); } - else { - String externalIDs = actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS); - if (externalIDs != null) { - context.setExternalChildIDs(externalIDs); - LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs); + + String externalID = actionData.get(LauncherAM.ACTION_DATA_NEW_ID); // MapReduce was launched + if (externalID != null) { + context.setExternalChildIDs(externalID); + LOG.info(XLog.STD, "Hadoop Job was launched : [{0}]", externalID); + } + + // Multiple child IDs - Pig or Hive action + String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS); + if (externalIDs != null) { + context.setExternalChildIDs(externalIDs); + LOG.info(XLog.STD, "External Child IDs : [{0}]", externalIDs); + + } + + LOG.info(XLog.STD, "action completed, external ID [{0}]", action.getExternalId()); + context.setExecutionData(appStatus.toString(), null); + if (appStatus == FinalApplicationStatus.SUCCEEDED) { + if (getCaptureOutput(action) && LauncherHelper.hasOutputData(actionData)) { + context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData + .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS))); + LOG.info(XLog.STD, "action produced output"); } - else if (LauncherMapperHelper.hasOutputData(actionData)) { - // Load stored Hadoop jobs ids and promote them as external child ids - // This is for jobs launched with older release during upgrade to Oozie 4.3 - Properties props = PropertiesUtils.stringToProperties(actionData - .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS)); - if (props.get(LauncherMain.HADOOP_JOBS) != null) { - externalIDs = (String) props.get(LauncherMain.HADOOP_JOBS); - context.setExternalChildIDs(externalIDs); - LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs); - } + else { + context.setExecutionData(SUCCEEDED, null); } - } - if (runningJob.isComplete()) { - // fetching action output and stats for the Map-Reduce action. - if (newId != null) { - actionData = LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf); + if (LauncherHelper.hasStatsData(actionData)) { + context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS)); + LOG.info(XLog.STD, "action produced stats"); } - LOG.info(XLog.STD, "action completed, external ID [{0}]", - action.getExternalId()); - if (LauncherMapperHelper.isMainSuccessful(runningJob)) { - if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) { - context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData - .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS))); - LOG.info(XLog.STD, "action produced output"); + getActionData(actionFs, action, context); + } + else { + String errorReason; + if (actionData.containsKey(LauncherAM.ACTION_DATA_ERROR_PROPS)) { + Properties props = PropertiesUtils.stringToProperties(actionData + .get(LauncherAM.ACTION_DATA_ERROR_PROPS)); + String errorCode = props.getProperty("error.code"); + if ("0".equals(errorCode)) { + errorCode = "JA018"; } - else { - context.setExecutionData(SUCCEEDED, null); + if ("-1".equals(errorCode)) { + errorCode = "JA019"; } - if (LauncherMapperHelper.hasStatsData(actionData)) { - context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS)); - LOG.info(XLog.STD, "action produced stats"); + errorReason = props.getProperty("error.reason"); + LOG.warn("Launcher ERROR, reason: {0}", errorReason); + String exMsg = props.getProperty("exception.message"); + String errorInfo = (exMsg != null) ? exMsg : errorReason; + context.setErrorInfo(errorCode, errorInfo); + String exStackTrace = props.getProperty("exception.stacktrace"); + if (exMsg != null) { + LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace); } - getActionData(actionFs, runningJob, action, context); } else { - String errorReason; - if (actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) { - Properties props = PropertiesUtils.stringToProperties(actionData - .get(LauncherMapper.ACTION_DATA_ERROR_PROPS)); - String errorCode = props.getProperty("error.code"); - if ("0".equals(errorCode)) { - errorCode = "JA018"; - } - if ("-1".equals(errorCode)) { - errorCode = "JA019"; - } - errorReason = props.getProperty("error.reason"); - LOG.warn("Launcher ERROR, reason: {0}", errorReason); - String exMsg = props.getProperty("exception.message"); - String errorInfo = (exMsg != null) ? exMsg : errorReason; - context.setErrorInfo(errorCode, errorInfo); - String exStackTrace = props.getProperty("exception.stacktrace"); - if (exMsg != null) { - LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace); - } - } - else { - errorReason = XLog.format("LauncherMapper died, check Hadoop LOG for job [{0}:{1}]", action - .getTrackerUri(), action.getExternalId()); - LOG.warn(errorReason); - } - context.setExecutionData(FAILED_KILLED, null); + errorReason = XLog.format("Launcher AM died, check Hadoop LOG for job [{0}:{1}]", action + .getTrackerUri(), action.getExternalId()); + LOG.warn(errorReason); } - } - else { - context.setExternalStatus("RUNNING"); - LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]", - runningJob.getID()); + context.setExecutionData(FAILED_KILLED, null); } } else { - context.setExternalStatus("RUNNING"); + context.setExternalStatus(YarnApplicationState.RUNNING.toString()); LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]", - runningJob.getID()); + action.getExternalId()); } } catch (Exception ex) { LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(), ex); - exception = true; throw convertException(ex); } finally { - if (jobClient != null) { - try { - jobClient.close(); - } - catch (Exception e) { - if (exception) { - LOG.error("JobClient error: ", e); - } - else { - throw convertException(e); - } - } + if (yarnClient != null) { + IOUtils.closeQuietly(yarnClient); } } } @@ -1591,14 +1481,12 @@ public class JavaActionExecutor extends ActionExecutor { /** * Get the output data of an action. Subclasses should override this method * to get action specific output data. - * * @param actionFs the FileSystem object - * @param runningJob the runningJob * @param action the Workflow action * @param context executor context * */ - protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) + protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException { } @@ -1611,55 +1499,39 @@ public class JavaActionExecutor extends ActionExecutor { @Override public void kill(Context context, WorkflowAction action) throws ActionExecutorException { - JobClient jobClient = null; - boolean exception = false; + YarnClient yarnClient = null; try { Element actionXml = XmlUtils.parseXml(action.getConf()); - final JobConf jobConf = createBaseHadoopConf(context, actionXml); - WorkflowJob wfJob = context.getWorkflow(); - Configuration conf = null; - if ( wfJob.getConf() != null ) { - conf = new XConfiguration(new StringReader(wfJob.getConf())); - } - String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), action); - jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, LauncherMapperHelper.getTag(launcherTag)); - jobConf.set(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, Long.toString(action.getStartTime().getTime())); - UserGroupInformation ugi = Services.get().get(UserGroupInformationService.class) - .getProxyUser(context.getWorkflow().getUser()); - ugi.doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - LauncherMainHadoopUtils.killChildYarnJobs(jobConf); - return null; + + final Configuration jobConf = createBaseHadoopConf(context, actionXml); + String launcherTag = LauncherHelper.getActionYarnTag(jobConf, context.getWorkflow().getParentId(), action); + jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag)); + yarnClient = createYarnClient(context, jobConf); + if(action.getExternalId() != null) { + yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId())); + } + for(ApplicationId id : LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL, + action.getStartTime().getTime())){ + try { + yarnClient.killApplication(id); + } catch (Exception e) { + LOG.warn("Could not kill child of {0}, {1}", action.getExternalId(), id); } - }); - jobClient = createJobClient(context, jobConf); - RunningJob runningJob = getRunningJob(context, action, jobClient); - if (runningJob != null) { - runningJob.killJob(); } + context.setExternalStatus(KILLED); context.setExecutionData(KILLED, null); - } - catch (Exception ex) { - exception = true; + } catch (Exception ex) { + LOG.error("Error when killing YARN application", ex); throw convertException(ex); - } - finally { + } finally { try { FileSystem actionFs = context.getAppFileSystem(); cleanUpActionDir(actionFs, context); - if (jobClient != null) { - jobClient.close(); - } - } - catch (Exception ex) { - if (exception) { - LOG.error("Error: ", ex); - } - else { - throw convertException(ex); - } + Closeables.closeQuietly(yarnClient); + } catch (Exception ex) { + LOG.error("Error when cleaning up action dir", ex); + throw convertException(ex); } } } @@ -1754,7 +1626,7 @@ public class JavaActionExecutor extends ActionExecutor { HadoopAccessorException, URISyntaxException { } - private void injectJobInfo(JobConf launcherJobConf, Configuration actionConf, Context context, WorkflowAction action) { + private void injectJobInfo(Configuration launcherJobConf, Configuration actionConf, Context context, WorkflowAction action) { if (OozieJobInfo.isJobInfoEnabled()) { try { OozieJobInfo jobInfo = new OozieJobInfo(actionConf, context, action);
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java new file mode 100644 index 0000000..f80141c --- /dev/null +++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.math.BigInteger; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.service.HadoopAccessorException; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.URIHandlerService; +import org.apache.oozie.service.UserGroupInformationService; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.PropertiesUtils; + +public class LauncherHelper { + + public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag"; + + private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator(); + + public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) + throws HadoopAccessorException, IOException { + String jobId = null; + Path recoveryFile = new Path(actionDir, recoveryId); + FileSystem fs = Services.get().get(HadoopAccessorService.class) + .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf); + + if (fs.exists(recoveryFile)) { + InputStream is = fs.open(recoveryFile); + BufferedReader reader = new BufferedReader(new InputStreamReader(is)); + jobId = reader.readLine(); + reader.close(); + } + return jobId; + + } + + public static void setupMainClass(Configuration launcherConf, String javaMainClass) { + // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via + // <configuration> property + if (javaMainClass != null && !javaMainClass.equals("")) { + launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass); + } + } + + public static void setupLauncherURIHandlerConf(Configuration launcherConf) { + for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) { + launcherConf.set(entry.getKey(), entry.getValue()); + } + } + + public static void setupMainArguments(Configuration launcherConf, String[] args) { + launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length); + for (int i = 0; i < args.length; i++) { + launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]); + } + } + + public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) { + launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData); + } + + /** + * Set the maximum value of stats data + * + * @param launcherConf the oozie launcher configuration + * @param maxStatsData the maximum allowed size of stats data + */ + public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){ + launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData); + } + + /** + * Set the maximum number of globbed files/dirs + * + * @param launcherConf the oozie launcher configuration + * @param fsGlobMax the maximum number of files/dirs for FS operation + */ + public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){ + launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax); + } + + public static void setupLauncherInfo(Configuration launcherConf, String jobId, String actionId, Path actionDir, + String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException { + + launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); + launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); + launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString()); + launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId); + launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML); + + actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); + actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); + + if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) { + List<String> purgedEntries = new ArrayList<String>(); + Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files"); + for (String entry : entries) { + if (entry.contains("#")) { + purgedEntries.add(entry); + } + } + actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()])); + launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true); + } + } + + public static void setupYarnRestartHandling(Configuration launcherJobConf, Configuration actionConf, String launcherTag, + long launcherTime) + throws NoSuchAlgorithmException { + launcherJobConf.setLong(LauncherMain.OOZIE_JOB_LAUNCH_TIME, launcherTime); + // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length) + String tag = getTag(launcherTag); + // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself. + // mapreduce.job.tags should only go to child job launch by launcher. + actionConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, tag); + } + + public static String getTag(String launcherTag) throws NoSuchAlgorithmException { + MessageDigest digest = MessageDigest.getInstance("MD5"); + digest.update(launcherTag.getBytes(), 0, launcherTag.length()); + String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16); + return md5; + } + + public static boolean isMainDone(RunningJob runningJob) throws IOException { + return runningJob.isComplete(); + } + + public static boolean isMainSuccessful(RunningJob runningJob) throws IOException { + boolean succeeded = runningJob.isSuccessful(); + if (succeeded) { + Counters counters = runningJob.getCounters(); + if (counters != null) { + Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP); + if (group != null) { + succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0; + } + } + } + return succeeded; + } + + /** + * Determine whether action has external child jobs or not + * @param actionData + * @return true/false + * @throws IOException + */ + public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException { + return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS); + } + + /** + * Determine whether action has output data or not + * @param actionData + * @return true/false + * @throws IOException + */ + public static boolean hasOutputData(Map<String, String> actionData) throws IOException { + return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS); + } + + /** + * Determine whether action has external stats or not + * @param actionData + * @return true/false + * @throws IOException + */ + public static boolean hasStatsData(Map<String, String> actionData) throws IOException{ + return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS); + } + + /** + * Determine whether action has new id (id swap) or not + * @param actionData + * @return true/false + * @throws IOException + */ + public static boolean hasIdSwap(Map<String, String> actionData) throws IOException { + return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID); + } + + /** + * Get the sequence file path storing all action data + * @param actionDir + * @return + */ + public static Path getActionDataSequenceFilePath(Path actionDir) { + return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE); + } + + /** + * Utility function to load the contents of action data sequence file into + * memory object + * + * @param fs Action Filesystem + * @param actionDir Path + * @param conf Configuration + * @return Map action data + * @throws IOException + * @throws InterruptedException + */ + public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf) + throws IOException, InterruptedException { + UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class); + UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME)); + + return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() { + @Override + public Map<String, String> run() throws IOException { + Map<String, String> ret = new HashMap<String, String>(); + Path seqFilePath = getActionDataSequenceFilePath(actionDir); + if (fs.exists(seqFilePath)) { + SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf); + Text key = new Text(), value = new Text(); + while (seqFile.next(key, value)) { + ret.put(key.toString(), value.toString()); + } + seqFile.close(); + } + else { // maintain backward-compatibility. to be deprecated + org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir); + InputStream is; + BufferedReader reader = null; + Properties props; + if (files != null && files.length > 0) { + for (int x = 0; x < files.length; x++) { + Path file = files[x].getPath(); + if (file.equals(new Path(actionDir, "externalChildIds.properties"))) { + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS, + IOUtils.getReaderAsString(reader, -1)); + } + else if (file.equals(new Path(actionDir, "newId.properties"))) { + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + props = PropertiesUtils.readProperties(reader, -1); + ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id")); + } + else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) { + int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, + 2 * 1024); + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils + .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData))); + } + else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) { + int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, + Integer.MAX_VALUE); + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils + .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData))); + } + else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) { + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1)); + } + } + } + } + return ret; + } + }); + } + + public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) { + String tag; + if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) { + tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName(); + } else if (parentId != null) { + tag = parentId + "@" + wfAction.getName(); + } else { + tag = wfAction.getId(); + } + return tag; + } + +}