Merge branch 'master' into oya
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/19f56172 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/19f56172 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/19f56172 Branch: refs/heads/oya Commit: 19f561726e62ae700bd3370d72295c12ec5ac484 Parents: 523ec74 53b1d1e Author: Gezapeti Cseh <gezap...@gmail.com> Authored: Tue May 23 18:21:20 2017 +0200 Committer: Gezapeti Cseh <gezap...@gmail.com> Committed: Tue May 23 18:42:17 2017 +0200 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 6 ++++ core/src/main/resources/oozie-default.xml | 9 +++++ .../action/hadoop/TestJavaActionExecutor.java | 18 ++++++++-- pom.xml | 2 +- release-log.txt | 3 ++ .../apache/oozie/action/hadoop/LauncherAM.java | 8 +---- .../oozie/action/hadoop/LauncherMapper.java | 35 ++++++++++++------ .../oozie/action/hadoop/TestLauncherMapper.java | 37 +++++++++++++++++--- 8 files changed, 93 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/19f56172/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 7836c74,06ae5fd..f4c1127 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@@ -835,7 -997,13 +835,13 @@@ public class JavaActionExecutor extend for (int i = 0; i < list.size(); i++) { args[i] = list.get(i).getTextTrim(); } - LauncherMapperHelper.setupMainArguments(launcherJobConf, args); + LauncherHelper.setupMainArguments(launcherJobConf, args); + // backward compatibility flag - see OOZIE-2872 + if (ConfigurationService.getBoolean(LauncherMapper.CONF_OOZIE_NULL_ARGS_ALLOWED)) { + launcherJobConf.setBoolean(LauncherMapper.CONF_OOZIE_NULL_ARGS_ALLOWED, true); + } else { + launcherJobConf.setBoolean(LauncherMapper.CONF_OOZIE_NULL_ARGS_ALLOWED, false); + } // Make mapred.child.java.opts and mapreduce.map.java.opts equal, but give values from the latter priority; also append // <java-opt> and <java-opts> and give those highest priority http://git-wip-us.apache.org/repos/asf/oozie/blob/19f56172/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/19f56172/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --cc core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index 749050f,b27b3d8..48809ce --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@@ -2216,11 -2961,18 +2227,12 @@@ public class TestJavaActionExecutor ext "</java>"; Context context = createContext(actionXml, null); - final RunningJob runningJob = submitAction(context); - waitFor(60 * 1000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return runningJob.isComplete(); - } - }); - assertTrue(runningJob.isSuccessful()); + submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId()); ActionExecutor ae = new JavaActionExecutor(); ae.check(context, context.getAction()); - assertEquals("FAILED/KILLED", context.getAction().getExternalStatus()); + assertTrue(ae.isCompleted(context.getAction().getExternalStatus())); + assertEquals(expectedExternalStatus, context.getAction().getExternalStatus()); assertNull(context.getAction().getData()); ae.end(context, context.getAction()); http://git-wip-us.apache.org/repos/asf/oozie/blob/19f56172/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/19f56172/release-log.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/19f56172/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java ---------------------------------------------------------------------- diff --cc sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java index 9484804,0000000..4f252d1 mode 100644,000000..100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java @@@ -1,620 -1,0 +1,614 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.security.Permission; +import java.security.PrivilegedExceptionAction; +import java.text.MessageFormat; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.StringTokenizer; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +public class LauncherAM { + private static final String OOZIE_ACTION_CONF_XML = "oozie.action.conf.xml"; + private static final String OOZIE_LAUNCHER_JOB_ID = "oozie.launcher.job.id"; + + public static final String JAVA_CLASS_PATH = "java.class.path"; + public static final String OOZIE_ACTION_ID = "oozie.action.id"; + public static final String OOZIE_JOB_ID = "oozie.job.id"; + public static final String ACTION_PREFIX = "oozie.action."; + static final String OOZIE_ACTION_RECOVERY_ID = ACTION_PREFIX + "recovery.id"; + public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = ACTION_PREFIX + "max.output.data"; + public static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = ACTION_PREFIX + "main.arg."; + public static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + "count"; + public static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size"; + public static final String OOZIE_ACTION_DIR_PATH = ACTION_PREFIX + "dir.path"; + public static final String ACTION_PREPARE_XML = ACTION_PREFIX + "prepare.xml"; + public static final String ACTION_DATA_SEQUENCE_FILE = "action-data.seq"; // COMBO FILE + public static final String ACTION_DATA_EXTERNAL_CHILD_IDS = "externalChildIDs"; + public static final String ACTION_DATA_OUTPUT_PROPS = "output.properties"; + public static final String ACTION_DATA_STATS = "stats.properties"; + public static final String ACTION_DATA_NEW_ID = "newId"; + public static final String ACTION_DATA_ERROR_PROPS = "error.properties"; + public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class"; + + // TODO: OYA: more unique file names? action.xml may be stuck for backwards compat though + public static final String LAUNCHER_JOB_CONF_XML = "launcher.xml"; + public static final String ACTION_CONF_XML = "action.xml"; + public static final String ACTION_DATA_FINAL_STATUS = "final.status"; + + private final UserGroupInformation ugi; + private final AMRMCallBackHandler callbackHandler; + private final AMRMClientAsyncFactory amRmClientAsyncFactory; + private final HdfsOperations hdfsOperations; + private final LocalFsOperations localFsOperations; + private final PrepareActionsHandler prepareHandler; + private final LauncherAMCallbackNotifierFactory callbackNotifierFactory; + private final LauncherSecurityManager launcherSecurityManager; + private final ContainerId containerId; + + private Configuration launcherJobConf; + private AMRMClientAsync<?> amRmClientAsync; + private Path actionDir; + private Map<String, String> actionData = new HashMap<String,String>(); + + public LauncherAM(UserGroupInformation ugi, + AMRMClientAsyncFactory amRmClientAsyncFactory, + AMRMCallBackHandler callbackHandler, + HdfsOperations hdfsOperations, + LocalFsOperations localFsOperations, + PrepareActionsHandler prepareHandler, + LauncherAMCallbackNotifierFactory callbackNotifierFactory, + LauncherSecurityManager launcherSecurityManager, + String containerId) { + this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null"); + this.amRmClientAsyncFactory = Preconditions.checkNotNull(amRmClientAsyncFactory, + "amRmClientAsyncFactory should not be null"); + this.callbackHandler = Preconditions.checkNotNull(callbackHandler, "callbackHandler should not be null"); + this.hdfsOperations = Preconditions.checkNotNull(hdfsOperations, "hdfsOperations should not be null"); + this.localFsOperations = Preconditions.checkNotNull(localFsOperations, "localFsOperations should not be null"); + this.prepareHandler = Preconditions.checkNotNull(prepareHandler, "prepareHandler should not be null"); + this.callbackNotifierFactory = Preconditions.checkNotNull(callbackNotifierFactory, + "callbackNotifierFactory should not be null"); + this.launcherSecurityManager = Preconditions.checkNotNull(launcherSecurityManager, + "launcherSecurityManager should not be null"); + this.containerId = ContainerId.fromString(Preconditions.checkNotNull(containerId, "containerId should not be null")); + } + + public static void main(String[] args) throws Exception { + UserGroupInformation ugi = null; + String submitterUser = System.getProperty("submitter.user", "").trim(); + Preconditions.checkArgument(!submitterUser.isEmpty(), "Submitter user is undefined"); + System.out.println("Submitter user is: " + submitterUser); + + // We don't need remote/proxy user if the current login user is the workflow submitter + // Otherwise we have to create a remote user + if (UserGroupInformation.getLoginUser().getShortUserName().equals(submitterUser)) { + System.out.println("Using login user for UGI"); + ugi = UserGroupInformation.getLoginUser(); + } else { + ugi = UserGroupInformation.createRemoteUser(submitterUser); + ugi.addCredentials(UserGroupInformation.getLoginUser().getCredentials()); + } + + AMRMClientAsyncFactory amRmClientAsyncFactory = new AMRMClientAsyncFactory(); + AMRMCallBackHandler callbackHandler = new AMRMCallBackHandler(); + HdfsOperations hdfsOperations = new HdfsOperations(new SequenceFileWriterFactory(), ugi); + LocalFsOperations localFSOperations = new LocalFsOperations(); + PrepareActionsHandler prepareHandler = new PrepareActionsHandler(); + LauncherAMCallbackNotifierFactory callbackNotifierFactory = new LauncherAMCallbackNotifierFactory(); + LauncherSecurityManager launcherSecurityManager = new LauncherSecurityManager(); + + LauncherAM launcher = new LauncherAM(ugi, + amRmClientAsyncFactory, + callbackHandler, + hdfsOperations, + localFSOperations, + prepareHandler, + callbackNotifierFactory, + launcherSecurityManager, + System.getenv("CONTAINER_ID")); + + launcher.run(); + } + + public void run() throws Exception { + final ErrorHolder errorHolder = new ErrorHolder(); + OozieActionResult actionResult = OozieActionResult.FAILED; + boolean launcherExecutedProperly = false; + boolean backgroundAction = false; + + try { + try { + launcherJobConf = localFsOperations.readLauncherConf(); + System.out.println("Launcher AM configuration loaded"); + } catch (Exception ex) { + errorHolder.setErrorMessage("Could not load the Launcher AM configuration file"); + errorHolder.setErrorCause(ex); + throw ex; + } + actionDir = new Path(launcherJobConf.get(OOZIE_ACTION_DIR_PATH)); + + registerWithRM(); + executePrepare(ugi, errorHolder); + final String[] mainArgs = getMainArguments(launcherJobConf); + printDebugInfo(); + setupMainConfiguration(); + launcherExecutedProperly = runActionMain(mainArgs, errorHolder, ugi); + + if (launcherExecutedProperly) { + handleActionData(); + if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) { + System.out.println(); + System.out.println("Oozie Launcher, capturing output data:"); + System.out.println("======================="); + System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS)); + System.out.println(); + System.out.println("======================="); + System.out.println(); + } + if (actionData.get(ACTION_DATA_NEW_ID) != null) { + System.out.println(); + System.out.println("Oozie Launcher, propagating new Hadoop job id to Oozie"); + System.out.println("======================="); + System.out.println(actionData.get(ACTION_DATA_NEW_ID)); + System.out.println("======================="); + System.out.println(); + backgroundAction = true; + } + } + } catch (Exception e) { + System.out.println("Launcher AM execution failed"); + System.err.println("Launcher AM execution failed"); + e.printStackTrace(System.out); + e.printStackTrace(System.err); + launcherExecutedProperly = false; + if (!errorHolder.isPopulated()) { + errorHolder.setErrorCause(e); + errorHolder.setErrorMessage(e.getMessage()); + } + throw e; + } finally { + try { + ErrorHolder callbackErrorHolder = callbackHandler.getError(); + + if (launcherExecutedProperly) { + actionResult = backgroundAction ? OozieActionResult.RUNNING : OozieActionResult.SUCCEEDED; + } + + if (!launcherExecutedProperly) { + updateActionDataWithFailure(errorHolder, actionData); + } else if (callbackErrorHolder != null) { // async error from the callback + actionResult = OozieActionResult.FAILED; + updateActionDataWithFailure(callbackErrorHolder, actionData); + } + + actionData.put(ACTION_DATA_FINAL_STATUS, actionResult.toString()); + hdfsOperations.uploadActionDataToHDFS(launcherJobConf, actionDir, actionData); + } finally { + try { + unregisterWithRM(actionResult, errorHolder.getErrorMessage()); + } finally { + LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherJobConf); + cn.notifyURL(actionResult); + } + } + } + } + + @VisibleForTesting + Map<String, String> getActionData() { + return actionData; + } + + private void printDebugInfo() throws IOException { + localFsOperations.printContentsOfDir(new File(".")); + + System.out.println(); + System.out.println("Oozie Launcher Application Master configuration"); + System.out.println("==============================================="); + System.out.println("Workflow job id : " + launcherJobConf.get(OOZIE_JOB_ID)); + System.out.println("Workflow action id: " + launcherJobConf.get(OOZIE_ACTION_ID)); + System.out.println(); + System.out.println("Classpath :"); + System.out.println("------------------------"); + StringTokenizer st = new StringTokenizer(System.getProperty(JAVA_CLASS_PATH), ":"); + while (st.hasMoreTokens()) { + System.out.println(" " + st.nextToken()); + } + System.out.println("------------------------"); + System.out.println(); + String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); + System.out.println("Main class : " + mainClass); + System.out.println(); + System.out.println("Maximum output : " + + launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024)); + System.out.println(); + + System.out.println(); + System.out.println("Java System Properties:"); + System.out.println("------------------------"); + System.getProperties().store(System.out, ""); + System.out.println("------------------------"); + System.out.println(); + + System.out.println("Environment variables"); + Map<String, String> env = System.getenv(); + System.out.println("------------------------"); + for (Map.Entry<String, String> entry : env.entrySet()) { + System.out.println(entry.getKey() + "=" + entry.getValue()); + } + System.out.println("------------------------"); + System.out.println("================================================================="); + System.out.println(); + System.out.println(">>> Invoking Main class now >>>"); + System.out.println(); + } + + private void registerWithRM() throws IOException, YarnException { + // TODO: OYA: make heartbeat interval configurable & make interval higher to put less load on RM, but lower than timeout + amRmClientAsync = amRmClientAsyncFactory.createAMRMClientAsync(60000); + amRmClientAsync.init(new Configuration(launcherJobConf)); + amRmClientAsync.start(); + + // hostname and tracking url are determined automatically + amRmClientAsync.registerApplicationMaster("", 0, ""); + } + + private void unregisterWithRM(OozieActionResult actionResult, String message) throws YarnException, IOException { + if (amRmClientAsync != null) { + System.out.println("Stopping AM"); + try { + message = (message == null) ? "" : message; + // tracking url is determined automatically + amRmClientAsync.unregisterApplicationMaster(actionResult.getYarnStatus(), message, ""); + } catch (Exception ex) { + System.out.println("Error un-registering AM client"); + throw ex; + } finally { + amRmClientAsync.stop(); + } + } + } + + // Method to execute the prepare actions + private void executePrepare(UserGroupInformation ugi, ErrorHolder errorHolder) throws Exception { + try { + System.out.println("\nStarting the execution of prepare actions"); + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + String prepareXML = launcherJobConf.get(ACTION_PREPARE_XML); + if (prepareXML != null) { + if (prepareXML.length() != 0) { + Configuration actionConf = new Configuration(launcherJobConf); + actionConf.addResource(ACTION_CONF_XML); + prepareHandler.prepareAction(prepareXML, actionConf); + } else { + System.out.println("There are no prepare actions to execute."); + } + } + return null; + } + }); + System.out.println("Completed the execution of prepare actions successfully"); + } catch (Exception ex) { + errorHolder.setErrorMessage("Prepare execution in the Launcher AM has failed"); + errorHolder.setErrorCause(ex); + throw ex; + } + } + + private void setupMainConfiguration() throws IOException { + System.setProperty(OOZIE_LAUNCHER_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID)); + System.setProperty(OOZIE_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID)); + System.setProperty(OOZIE_ACTION_ID, launcherJobConf.get(OOZIE_ACTION_ID)); + System.setProperty(OOZIE_ACTION_CONF_XML, new File(ACTION_CONF_XML).getAbsolutePath()); + System.setProperty(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, + new File(ACTION_DATA_EXTERNAL_CHILD_IDS).getAbsolutePath()); + System.setProperty(ACTION_PREFIX + ACTION_DATA_STATS, new File(ACTION_DATA_STATS).getAbsolutePath()); + System.setProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID, new File(ACTION_DATA_NEW_ID).getAbsolutePath()); + System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath()); + System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new File(ACTION_DATA_ERROR_PROPS).getAbsolutePath()); + + System.setProperty("oozie.job.launch.time", String.valueOf(System.currentTimeMillis())); + } + + private boolean runActionMain(final String[] mainArgs, final ErrorHolder eHolder, UserGroupInformation ugi) throws Exception { + // using AtomicBoolean because we want to modify it inside run() + final AtomicBoolean actionMainExecutedProperly = new AtomicBoolean(false); + + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + try { + setRecoveryId(); + Class<?> klass = launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, null); + Preconditions.checkNotNull(klass, "Launcher class should not be null"); + System.out.println("Launcher class: " + klass.toString()); + Method mainMethod = klass.getMethod("main", String[].class); + // Enable LauncherSecurityManager to catch System.exit calls + launcherSecurityManager.enable(); + mainMethod.invoke(null, (Object) mainArgs); + + System.out.println(); + System.out.println("<<< Invocation of Main class completed <<<"); + System.out.println(); + actionMainExecutedProperly.set(true); + } catch (InvocationTargetException ex) { + ex.printStackTrace(System.out); + // Get what actually caused the exception + Throwable cause = ex.getCause(); + // If we got a JavaMainException from JavaMain, then we need to unwrap it + if (JavaMain.JavaMainException.class.isInstance(cause)) { + cause = cause.getCause(); + } + if (LauncherMainException.class.isInstance(cause)) { + int errorCode = ((LauncherMainException) ex.getCause()).getErrorCode(); + String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); + eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" + + errorCode + "]"); + eHolder.setErrorCode(errorCode); + } else if (SecurityException.class.isInstance(cause)) { + if (launcherSecurityManager.getExitInvoked()) { + final int exitCode = launcherSecurityManager.getExitCode(); + System.out.println("Intercepting System.exit(" + exitCode + ")"); + // if 0 main() method finished successfully + // ignoring + eHolder.setErrorCode(exitCode); + if (exitCode != 0) { + String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); + eHolder.setErrorMessage("Main Class [" + mainClass + "]," + + " exit code [" + eHolder.getErrorCode() + "]"); + } else { + actionMainExecutedProperly.set(true); + } + } else { + // just SecurityException, no exit was invoked + eHolder.setErrorCode(0); + eHolder.setErrorCause(cause); + eHolder.setErrorMessage(cause.getMessage()); + } + } else { + eHolder.setErrorMessage(cause.getMessage()); + eHolder.setErrorCause(cause); + } + } catch (Throwable t) { + t.printStackTrace(); + eHolder.setErrorMessage(t.getMessage()); + eHolder.setErrorCause(t); + } finally { + // Disable LauncherSecurityManager + launcherSecurityManager.disable(); + } + + return null; + } + }); + + return actionMainExecutedProperly.get(); + } + + private void setRecoveryId() throws LauncherException { + try { + ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); + String applicationIdStr = applicationId.toString(); + + String recoveryId = Preconditions.checkNotNull(launcherJobConf.get(OOZIE_ACTION_RECOVERY_ID), + "RecoveryID should not be null"); + + Path path = new Path(actionDir, recoveryId); + if (!hdfsOperations.fileExists(path, launcherJobConf)) { + hdfsOperations.writeStringToFile(path, launcherJobConf, applicationIdStr); + } else { + String id = hdfsOperations.readFileContents(path, launcherJobConf); + + if (!applicationIdStr.equals(id)) { + throw new LauncherException(MessageFormat.format( + "YARN Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", path, id, + applicationIdStr)); + } + } + } catch (RuntimeException | InterruptedException | IOException ex) { + throw new LauncherException("IO error", ex); + } + } + + private void handleActionData() throws IOException { + // external child IDs + processActionData(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, null, + ACTION_DATA_EXTERNAL_CHILD_IDS, -1); + + // external stats + processActionData(ACTION_PREFIX + ACTION_DATA_STATS, CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, + ACTION_DATA_STATS, Integer.MAX_VALUE); + + // output data + processActionData(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, + ACTION_DATA_OUTPUT_PROPS, 2048); + + // id swap + processActionData(ACTION_PREFIX + ACTION_DATA_NEW_ID, null, + ACTION_DATA_NEW_ID, -1); + } + + private void processActionData(String propertyName, String maxSizePropertyName, String actionDataPropertyName, + int maxSizeDefault) throws IOException { + String propValue = System.getProperty(propertyName); + int maxSize = maxSizeDefault; + + if (maxSizePropertyName != null) { + maxSize = launcherJobConf.getInt(maxSizePropertyName, maxSizeDefault); + } + + if (propValue != null) { + File actionDataFile = new File(propValue); + if (localFsOperations.fileExists(actionDataFile)) { + actionData.put(actionDataPropertyName, localFsOperations.getLocalFileContentAsString(actionDataFile, + actionDataPropertyName, maxSize)); + } + } + } + + private void updateActionDataWithFailure(ErrorHolder eHolder, Map<String, String> actionData) { + if (eHolder.getErrorCause() != null && eHolder.getErrorCause().getMessage() != null) { + if (Objects.equal(eHolder.getErrorMessage(), eHolder.getErrorCause().getMessage())) { + eHolder.setErrorMessage(eHolder.getErrorMessage()); + } else { + eHolder.setErrorMessage(eHolder.getErrorMessage() + ", " + eHolder.getErrorCause().getMessage()); + } + } + + Properties errorProps = new Properties(); + errorProps.setProperty("error.code", Integer.toString(eHolder.getErrorCode())); + String errorMessage = eHolder.getErrorMessage() == null ? "<empty>" : eHolder.getErrorMessage(); + errorProps.setProperty("error.reason", errorMessage); + if (eHolder.getErrorCause() != null) { + if (eHolder.getErrorCause().getMessage() != null) { + errorProps.setProperty("exception.message", eHolder.getErrorCause().getMessage()); + } + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + eHolder.getErrorCause().printStackTrace(pw); + pw.close(); + errorProps.setProperty("exception.stacktrace", sw.toString()); + } + + StringWriter sw = new StringWriter(); + try { + errorProps.store(sw, ""); + sw.close(); + actionData.put(LauncherAM.ACTION_DATA_ERROR_PROPS, sw.toString()); + + // external child IDs + String externalChildIdsProp = System.getProperty(LauncherAM.ACTION_PREFIX + LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS); + if (externalChildIdsProp != null) { + File externalChildIDs = new File(externalChildIdsProp); + if (localFsOperations.fileExists(externalChildIDs)) { + actionData.put(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS, + localFsOperations.getLocalFileContentAsString(externalChildIDs, ACTION_DATA_EXTERNAL_CHILD_IDS, -1)); + } + } + } catch (IOException ioe) { + System.out.println("A problem occured trying to fail the launcher"); + ioe.printStackTrace(); + } finally { + System.out.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n"); + if (eHolder.getErrorCause() != null) { + eHolder.getErrorCause().printStackTrace(System.out); + } + } + } + + private String[] getMainArguments(Configuration conf) { - String[] args = new String[conf.getInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, 0)]; - - for (int i = 0; i < args.length; i++) { - args[i] = conf.get(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i); - } - - return args; ++ return LauncherMapper.getMainArguments(conf); + } + + public static class LauncherSecurityManager extends SecurityManager { + private boolean exitInvoked; + private int exitCode; + private SecurityManager originalSecurityManager; + + public LauncherSecurityManager() { + exitInvoked = false; + exitCode = 0; + originalSecurityManager = System.getSecurityManager(); + } + + @Override + public void checkPermission(Permission perm, Object context) { + if (originalSecurityManager != null) { + // check everything with the original SecurityManager + originalSecurityManager.checkPermission(perm, context); + } + } + + @Override + public void checkPermission(Permission perm) { + if (originalSecurityManager != null) { + // check everything with the original SecurityManager + originalSecurityManager.checkPermission(perm); + } + } + + @Override + public void checkExit(int status) throws SecurityException { + exitInvoked = true; + exitCode = status; + throw new SecurityException("Intercepted System.exit(" + status + ")"); + } + + public boolean getExitInvoked() { + return exitInvoked; + } + + public int getExitCode() { + return exitCode; + } + + public void enable() { + if (System.getSecurityManager() != this) { + System.setSecurityManager(this); + } + } + + public void disable() { + if (System.getSecurityManager() == this) { + System.setSecurityManager(originalSecurityManager); + } + } + } + + public enum OozieActionResult { + SUCCEEDED(FinalApplicationStatus.SUCCEEDED), + FAILED(FinalApplicationStatus.FAILED), + RUNNING(FinalApplicationStatus.SUCCEEDED); + + // YARN-equivalent status + private FinalApplicationStatus yarnStatus; + + OozieActionResult(FinalApplicationStatus yarnStatus) { + this.yarnStatus = yarnStatus; + } + + public FinalApplicationStatus getYarnStatus() { + return yarnStatus; + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/19f56172/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java ----------------------------------------------------------------------