OOZIE-2595 Make Pig action work, fix test cases Change-Id: I256d90652d116b83a5a8ced1fb23839de7e6aa70
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/11a84295 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/11a84295 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/11a84295 Branch: refs/heads/oya Commit: 11a84295a80da0707699a52532ff5630baf99555 Parents: ca7e56f Author: Peter Bacsko <pbac...@cloudera.com> Authored: Mon Sep 26 14:20:04 2016 +0200 Committer: Peter Cseh <gezap...@cloudera.com> Committed: Mon Sep 26 15:25:51 2016 +0200 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 18 +++++-- .../oozie/service/HadoopAccessorService.java | 17 ++++--- .../apache/oozie/action/hadoop/LauncherAM.java | 6 +++ .../action/hadoop/TestPigActionExecutor.java | 52 +++++++------------- .../hadoop/TestMapReduceActionExecutor.java | 20 -------- 5 files changed, 49 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 8637f64..8b5f2b0 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -242,7 +242,9 @@ public class JavaActionExecutor extends ActionExecutor { } else { conf = new JobConf(false); + // conf.set(HadoopAccessorService.OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, "true"); } + conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER)); conf.set(HADOOP_YARN_RM, jobTracker); conf.set(HADOOP_NAME_NODE, nameNode); @@ -1485,13 +1487,21 @@ public class JavaActionExecutor extends ActionExecutor { " action data. Failing this action!", action.getExternalId(), action.getId()); } } - String externalIDs = actionData.get(LauncherAM.ACTION_DATA_NEW_ID); // MapReduce was launched + + String externalID = actionData.get(LauncherAM.ACTION_DATA_NEW_ID); // MapReduce was launched + if (externalID != null) { + context.setExternalChildIDs(externalID); + LOG.info(XLog.STD, "Hadoop Job was launched : [{0}]", externalID); + } + + // Multiple child IDs - Pig or Hive action + String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS); if (externalIDs != null) { context.setExternalChildIDs(externalIDs); - LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs); + LOG.info(XLog.STD, "External Child IDs : [{0}]", externalIDs); } - LOG.info(XLog.STD, "action completed, external ID [{0}]", - action.getExternalId()); + + LOG.info(XLog.STD, "action completed, external ID [{0}]", action.getExternalId()); context.setExecutionData(appStatus.toString(), null); if (appStatus == FinalApplicationStatus.SUCCEEDED) { if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) { http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java index 0177241..5845e17 100644 --- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java +++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java @@ -86,17 +86,16 @@ public class HadoopAccessorService implements Service { public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal"; public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token"); - protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created"; /** The Kerberos principal for the job tracker.*/ protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal"; /** The Kerberos principal for the resource manager.*/ protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal"; protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; - private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>(); - - private static Configuration cachedConf; + private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created"; + private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>(); private static final String DEFAULT_ACTIONNAME = "default"; + private static Configuration cachedConf; private Set<String> jobTrackerWhitelist = new HashSet<String>(); private Set<String> nameNodeWhitelist = new HashSet<String>(); @@ -564,8 +563,14 @@ public class HadoopAccessorService implements Service { */ public FileSystem createFileSystem(String user, final URI uri, final Configuration conf) throws HadoopAccessorException { + return createFileSystem(user, uri, conf, true); + } + + private FileSystem createFileSystem(String user, final URI uri, final Configuration conf, boolean checkAccessorProperty) + throws HadoopAccessorException { ParamChecker.notEmpty(user, "user"); - if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { + + if (checkAccessorProperty && !conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { throw new HadoopAccessorException(ErrorCode.E0903); } @@ -750,7 +755,7 @@ public class HadoopAccessorService implements Service { fos.close(); } } - FileSystem fs = createFileSystem(user, uri, conf); + FileSystem fs = createFileSystem(user, uri, conf, false); Path dst = new Path(dir, filename); fs.copyFromLocalFile(new Path(f.getAbsolutePath()), dst); LocalResource localResource = Records.newRecord(LocalResource.class); http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java index 43ce520..c923dda 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -191,8 +192,13 @@ public class LauncherAM { } } } catch (Exception e) { + System.out.println("Launcher AM execution failed"); System.err.println("Launcher AM execution failed"); + e.printStackTrace(System.out); e.printStackTrace(System.err); + finalStatus = FinalApplicationStatus.FAILED; + eHolder.setErrorCause(e); + eHolder.setErrorMessage(e.getMessage()); throw e; } finally { try { http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java index 16064e7..0d0adf4 100644 --- a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java +++ b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java @@ -18,44 +18,36 @@ package org.apache.oozie.action.hadoop; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.fs.Path; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.service.ConfigurationService; -import org.apache.oozie.service.URIHandlerService; -import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.service.Services; -import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XmlUtils; -import org.apache.oozie.util.IOUtils; -import org.codehaus.jackson.JsonParser; import org.jdom.Element; import org.json.simple.JSONValue; import org.json.simple.parser.JSONParser; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.io.InputStream; -import java.io.FileInputStream; -import java.io.Writer; -import java.io.OutputStreamWriter; -import java.io.StringReader; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - public class TestPigActionExecutor extends ActionExecutorTestCase { private static final String PIG_SCRIPT = "set job.name 'test'\n" + "set debug on\n" + @@ -147,18 +139,10 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { private String submitAction(Context context) throws Exception { PigActionExecutor ae = new PigActionExecutor(); - WorkflowAction action = context.getAction(); - ae.prepareActionDir(getFileSystem(), context); ae.submitLauncher(getFileSystem(), context, action); - String jobId = action.getExternalId(); - String jobTracker = action.getTrackerUri(); - String consoleUrl = action.getConsoleUrl(); - assertNotNull(jobId); - assertNotNull(jobTracker); - assertNotNull(consoleUrl); return jobId; } @@ -217,11 +201,11 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { ae.check(context, wfAction); ae.end(context, wfAction); - assertEquals("SUCCEEDED", wfAction.getExternalStatus()); + assertEquals(JavaActionExecutor.SUCCEEDED, wfAction.getExternalStatus()); String stats = wfAction.getStats(); assertNotNull(stats); // check for some of the expected key values in the stats - Map m = (Map)JSONValue.parse(stats); + Map m = (Map) JSONValue.parse(stats); // check for expected 1st level JSON keys assertTrue(m.containsKey("PIG_VERSION")); @@ -229,7 +213,7 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { String[] childIDs = expectedChildIDs.split(","); assertTrue(m.containsKey(childIDs[0])); - Map q = (Map)m.get(childIDs[0]); + Map q = (Map) m.get(childIDs[0]); // check for expected 2nd level JSON keys assertTrue(q.containsKey("HADOOP_COUNTERS")); } http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java index 53330ce..39ee0bc 100644 --- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java +++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java @@ -395,26 +395,6 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { ae.prepareActionDir(getFileSystem(), context); ae.submitLauncher(getFileSystem(), context, action); - String jobId = action.getExternalId(); - String jobTracker = action.getTrackerUri(); - String consoleUrl = action.getConsoleUrl(); - assertNotNull(jobId); - assertNotNull(jobTracker); - assertNotNull(consoleUrl); - - Element e = XmlUtils.parseXml(action.getConf()); - XConfiguration conf = new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration")) - .toString())); - conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker")); - conf.set("fs.default.name", e.getChildTextTrim("name-node")); - conf.set("user.name", context.getProtoActionConf().get("user.name")); - conf.set("group.name", getTestGroup()); - conf.set("mapreduce.framework.name", "yarn"); - - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - XConfiguration.copy(conf, jobConf); - - ae.submitLauncher(getFileSystem(), context, context.getAction()); return context.getAction().getExternalId(); }