consolidate killing children in ActionExecutors Change-Id: Ifa2b4c0cc6fab7c4d24815c6da5ec9a604d4d28d
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/e8e5bd0b Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/e8e5bd0b Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/e8e5bd0b Branch: refs/heads/oya Commit: e8e5bd0b47d6ce384374dae997b35d92d12d005a Parents: 4d8549d Author: Gezapeti Cseh <gezap...@gmail.com> Authored: Tue May 9 18:34:13 2017 -0700 Committer: Gezapeti Cseh <gezap...@gmail.com> Committed: Tue May 9 18:44:39 2017 -0700 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 13 ++++- .../action/hadoop/MapReduceActionExecutor.java | 52 -------------------- 2 files changed, 11 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/e8e5bd0b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 980bd95..45a43bf 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -61,6 +61,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -1506,9 +1507,17 @@ public class JavaActionExecutor extends ActionExecutor { try { Element actionXml = XmlUtils.parseXml(action.getConf()); - Configuration jobConf = createBaseHadoopConf(context, actionXml); + final Configuration jobConf = createBaseHadoopConf(context, actionXml); + String launcherTag = LauncherHelper.getActionYarnTag(jobConf, context.getWorkflow().getParentId(), action); + jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag)); + jobConf.set(LauncherMain.OOZIE_JOB_LAUNCH_TIME, Long.toString(action.getStartTime().getTime())); yarnClient = createYarnClient(context, jobConf); - yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId())); + for(ApplicationId id : LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL)){ + yarnClient.killApplication(id); + } + if(action.getExternalId() != null) { + yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId())); + } context.setExternalStatus(KILLED); context.setExecutionData(KILLED, null); http://git-wip-us.apache.org/repos/asf/oozie/blob/e8e5bd0b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java index c15e362..dfe19d1 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java @@ -398,58 +398,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor { } @Override - public void kill(final Context context, final WorkflowAction action) throws ActionExecutorException { - // Kill the LauncherAM which submits the MR job - super.kill(context, action); - // TODO if action.getExternalChildIDs() is not empty, then kill based on that - // We have to check whether the MapReduce execution has started or not. If it has started, then we have to get - // the YARN ApplicationID based on the tag and kill it as well - YarnClient yarnClient = null; - try { - String tag = LauncherHelper.getTag(ActionExecutor.getActionYarnTag(new Configuration(), - context.getWorkflow(), action)); - GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); - gar.setScope(ApplicationsRequestScope.ALL); - gar.setApplicationTags(Collections.singleton(tag)); - Element actionXml = XmlUtils.parseXml(action.getConf()); - Configuration actionConf = loadHadoopDefaultResources(context, actionXml); - ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class); - GetApplicationsResponse apps = proxy.getApplications(gar); - List<ApplicationReport> appsList = apps.getApplicationList(); - - if (appsList.size() > 1) { - String applications = Joiner.on(",").join(Iterables.transform(appsList, new Function<ApplicationReport, String>() { - @Override - public String apply(@Nonnull ApplicationReport input) { - return input.toString(); - } - })); - - LOG.error("Too many applications were returned: {0}", applications); - throw new IllegalArgumentException("Too many applications were returned"); - } else if (appsList.size() == 1) { - - yarnClient = YarnClient.createYarnClient(); - yarnClient.init(actionConf); - yarnClient.start(); - - ApplicationReport app = appsList.get(0); - LOG.info("Killing MapReduce job {0}, YARN Id: {1}", action.getExternalChildIDs(), - app.getApplicationId().toString()); - yarnClient.killApplication(app.getApplicationId()); - } else { - LOG.info("No MapReduce job to kill"); - } - } catch (Exception e) { - throw convertException(e); - } finally { - if (yarnClient != null) { - Closeables.closeQuietly(yarnClient); - } - } - } - - @Override void injectActionCallback(Context context, Configuration actionConf) { injectCallback(context, actionConf); }