Repository: oozie Updated Branches: refs/heads/master 53b1d1e43 -> 21761f5b5
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java index 25092ce..7c3c5bb 100644 --- a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java +++ b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java @@ -18,44 +18,36 @@ package org.apache.oozie.action.hadoop; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.fs.Path; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.service.ConfigurationService; -import org.apache.oozie.service.URIHandlerService; -import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.service.Services; -import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XmlUtils; -import org.apache.oozie.util.IOUtils; -import org.codehaus.jackson.JsonParser; import org.jdom.Element; import org.json.simple.JSONValue; import org.json.simple.parser.JSONParser; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.io.InputStream; -import java.io.FileInputStream; -import java.io.Writer; -import java.io.OutputStreamWriter; -import java.io.StringReader; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - public class TestPigActionExecutor extends ActionExecutorTestCase { private static final String PIG_SCRIPT = "set job.name 'test'\n" + "set debug on\n" + @@ -144,49 +136,21 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { return new Context(wf, action); } - private RunningJob submitAction(Context context) throws Exception { + private String submitAction(Context context) throws Exception { PigActionExecutor ae = new PigActionExecutor(); - WorkflowAction action = context.getAction(); - ae.prepareActionDir(getFileSystem(), context); ae.submitLauncher(getFileSystem(), context, action); - String jobId = action.getExternalId(); - String jobTracker = action.getTrackerUri(); - String consoleUrl = action.getConsoleUrl(); - assertNotNull(jobId); - assertNotNull(jobTracker); - assertNotNull(consoleUrl); - - Element e = XmlUtils.parseXml(action.getConf()); - XConfiguration conf = - new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration")).toString())); - conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker")); - conf.set("fs.default.name", e.getChildTextTrim("name-node")); - conf.set("mapreduce.framework.name", "yarn"); - conf.set("user.name", context.getProtoActionConf().get("user.name")); - conf.set("group.name", getTestGroup()); - - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - XConfiguration.copy(conf, jobConf); - String user = jobConf.get("user.name"); - String group = jobConf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + + return jobId; } private void _testSubmit(String actionXml, boolean checkForSuccess) throws Exception { Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - evaluateLauncherJob(launcherJob); - assertTrue(launcherJob.isSuccessful()); - - sleep(2000); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); PigActionExecutor ae = new PigActionExecutor(); ae.check(context, context.getAction()); @@ -222,26 +186,25 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { // Set the action xml with the option for retrieving stats to true String actionXml = setPigActionXml(PIG_SCRIPT, true); Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - evaluateLauncherJob(launcherJob); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasStatsData(actionData)); + assertTrue(LauncherHelper.hasStatsData(actionData)); PigActionExecutor ae = new PigActionExecutor(); WorkflowAction wfAction = context.getAction(); ae.check(context, wfAction); ae.end(context, wfAction); - assertEquals("SUCCEEDED", wfAction.getExternalStatus()); + assertEquals(JavaActionExecutor.SUCCEEDED, wfAction.getExternalStatus()); String stats = wfAction.getStats(); assertNotNull(stats); // check for some of the expected key values in the stats - Map m = (Map)JSONValue.parse(stats); + Map m = (Map) JSONValue.parse(stats); // check for expected 1st level JSON keys assertTrue(m.containsKey("PIG_VERSION")); @@ -249,7 +212,7 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { String[] childIDs = expectedChildIDs.split(","); assertTrue(m.containsKey(childIDs[0])); - Map q = (Map)m.get(childIDs[0]); + Map q = (Map) m.get(childIDs[0]); // check for expected 2nd level JSON keys assertTrue(q.containsKey("HADOOP_COUNTERS")); } @@ -275,9 +238,8 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { // Set the action xml with the option for retrieving stats to false String actionXml = setPigActionXml(PIG_SCRIPT, false); Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - evaluateLauncherJob(launcherJob); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); PigActionExecutor ae = new PigActionExecutor(); WorkflowAction wfAction = context.getAction(); @@ -305,9 +267,8 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { // Set the action xml with the option for retrieving stats to true String actionXml = setPigActionXml(PIG_SCRIPT, true); Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - evaluateLauncherJob(launcherJob); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); PigActionExecutor ae = new PigActionExecutor(); WorkflowAction wfAction = context.getAction(); @@ -327,15 +288,14 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { // Set the action xml with the option for retrieving stats to false String actionXml = setPigActionXml(PIG_SCRIPT, false); Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - evaluateLauncherJob(launcherJob); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertFalse(LauncherMapperHelper.hasStatsData(actionData)); + assertFalse(LauncherHelper.hasStatsData(actionData)); PigActionExecutor ae = new PigActionExecutor(); WorkflowAction wfAction = context.getAction(); @@ -346,16 +306,6 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { assertNotNull(wfAction.getExternalChildIDs()); } - private void evaluateLauncherJob(final RunningJob launcherJob) throws Exception{ - waitFor(180 * 1000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - sleep(2000); - } - protected XConfiguration setPigConfig(boolean writeStats) { XConfiguration conf = new XConfiguration(); conf.set("oozie.pig.log.level", "INFO"); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigMainWithOldAPI.java ---------------------------------------------------------------------- diff --git a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigMainWithOldAPI.java b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigMainWithOldAPI.java index e52e6fd..74de433 100644 --- a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigMainWithOldAPI.java +++ b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigMainWithOldAPI.java @@ -21,7 +21,6 @@ package org.apache.oozie.action.hadoop; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.oozie.action.hadoop.MainTestCase; -import org.apache.oozie.action.hadoop.MapReduceMain; import org.apache.oozie.action.hadoop.PigMainWithOldAPI; import org.apache.oozie.action.hadoop.SharelibUtils; import org.apache.oozie.test.XFsTestCase; @@ -97,9 +96,9 @@ public class TestPigMainWithOldAPI extends XFsTestCase implements Callable<Void> SharelibUtils.addToDistributedCache("pig", fs, getFsTestCaseDir(), jobConfiguration); String[] params = { "IN=" + inputDir.toUri().getPath(), "OUT=" + outputDir.toUri().getPath() }; - MapReduceMain.setStrings(jobConfiguration, "oozie.pig.params", params); + ActionUtils.setStrings(jobConfiguration, "oozie.pig.params", params); String[] args = { "-v" }; - MapReduceMain.setStrings(jobConfiguration, "oozie.pig.args", args); + ActionUtils.setStrings(jobConfiguration, "oozie.pig.args", args); File actionXml = new File(getTestCaseDir(), "action.xml"); OutputStream os = new FileOutputStream(actionXml); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/spark/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml index abef24f..dec505b 100644 --- a/sharelib/spark/pom.xml +++ b/sharelib/spark/pom.xml @@ -248,6 +248,103 @@ <artifactId>oozie-examples</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-yarn_${spark.scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-web-proxy</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-shuffle</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-app</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-beeline</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-jdbc</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-metastore</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-service</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.hive</groupId> + <artifactId>hive-shims</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> @@ -273,37 +370,6 @@ <outputFile>${project.build.directory}/classpath</outputFile> </configuration> </execution> - <execution> - <id>create-mrapp-generated-classpath</id> - <phase>generate-test-resources</phase> - <goals> - <goal>build-classpath</goal> - </goals> - <configuration> - <!-- needed to run the unit test for DS to generate the required classpath - that is required in the env of the launch container in the mini mr/yarn cluster --> - <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <executions> - <execution> - <configuration> - <target> - <!-- needed to include Main class in classpath for mini yarn cluster for unit tests --> - <echo file="${project.build.directory}/test-classes/mrapp-generated-classpath" - append="true" message=":${project.build.directory}/classes"/> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - <phase>generate-test-resources</phase> - </execution> </executions> </plugin> <plugin> @@ -319,109 +385,5 @@ </plugin> </plugins> </build> - - <profiles> - <profile> - <id>hadoop-2</id> - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn_${spark.scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>compile</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-client</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-common</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-web-proxy</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-shuffle</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-jobclient</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-common</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-app</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-annotations</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-auth</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-aws</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </exclusion> - <exclusion> - <groupId>org.spark-project.hive</groupId> - <artifactId>hive-beeline</artifactId> - </exclusion> - <exclusion> - <groupId>org.spark-project.hive</groupId> - <artifactId>hive-common</artifactId> - </exclusion> - <exclusion> - <groupId>org.spark-project.hive</groupId> - <artifactId>hive-exec</artifactId> - </exclusion> - <exclusion> - <groupId>org.spark-project.hive</groupId> - <artifactId>hive-jdbc</artifactId> - </exclusion> - <exclusion> - <groupId>org.spark-project.hive</groupId> - <artifactId>hive-metastore</artifactId> - </exclusion> - <exclusion> - <groupId>org.spark-project.hive</groupId> - <artifactId>hive-serde</artifactId> - </exclusion> - <exclusion> - <groupId>org.spark-project.hive</groupId> - <artifactId>hive-service</artifactId> - </exclusion> - <exclusion> - <groupId>org.spark-project.hive</groupId> - <artifactId>hive-shims</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - </profile> - </profiles> </project> http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java index ffa934a..68f7a60 100644 --- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java @@ -91,7 +91,7 @@ public class SparkMain extends LauncherMain { prepareHadoopConfig(actionConf); setYarnTag(actionConf); - LauncherMainHadoopUtils.killChildYarnJobs(actionConf); + LauncherMain.killChildYarnJobs(actionConf); String logFile = setUpSparkLog4J(actionConf); setHiveSite(actionConf); List<String> sparkArgs = new ArrayList<String>(); @@ -361,11 +361,16 @@ public class SparkMain extends LauncherMain { */ private static File getMatchingFile(Pattern fileNamePattern) throws OozieActionConfiguratorException { File localDir = new File("."); - for(String fileName : localDir.list()){ - if(fileNamePattern.matcher(fileName).find()){ - return new File(fileName); + String[] files = localDir.list(); + + if (files != null) { + for(String fileName : files){ + if(fileNamePattern.matcher(fileName).find()){ + return new File(fileName); + } } } + return null; } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java index 458baaa..9d8d4aa 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java @@ -106,13 +106,8 @@ public class TestPyspark extends ActionExecutorTestCase { WorkflowAction.Status wfStatus) throws Exception { Context context = createContext(getActionXml(sparkOpts), wf); - final RunningJob launcherJob = submitAction(context); - waitFor(200 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); SparkActionExecutor ae = new SparkActionExecutor(); ae.check(context, context.getAction()); assertEquals(externalStatus, context.getAction().getExternalStatus()); @@ -120,7 +115,7 @@ public class TestPyspark extends ActionExecutorTestCase { assertEquals(wfStatus, context.getAction().getStatus()); } - protected RunningJob submitAction(Context context) throws Exception { + protected String submitAction(Context context) throws Exception { SparkActionExecutor ae = new SparkActionExecutor(); WorkflowAction action = context.getAction(); ae.prepareActionDir(getFileSystem(), context); @@ -131,12 +126,8 @@ public class TestPyspark extends ActionExecutorTestCase { assertNotNull(jobId); assertNotNull(jobTracker); assertNotNull(consoleUrl); - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - jobConf.set("mapred.job.tracker", jobTracker); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + + return jobId; } protected Context createContext(String actionXml, WorkflowJobBean wf) throws Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java index 8c77be0..d97f1f0 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java @@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.WorkflowAction; @@ -175,13 +176,8 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { scriptWriter.close(); Context context = createContext(getActionXml()); - final RunningJob launcherJob = submitAction(context); - waitFor(200 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + final String launcherID = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherID); SparkActionExecutor ae = new SparkActionExecutor(); ae.check(context, context.getAction()); @@ -212,7 +208,7 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { return new Context(wf, action); } - protected RunningJob submitAction(Context context) throws Exception { + protected String submitAction(Context context) throws Exception { SparkActionExecutor ae = new SparkActionExecutor(); WorkflowAction action = context.getAction(); @@ -227,14 +223,7 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { assertNotNull(jobTracker); assertNotNull(consoleUrl); - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - jobConf.set("mapred.job.tracker", jobTracker); - - JobClient jobClient = - Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + return jobId; } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/sqoop/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/sqoop/pom.xml b/sharelib/sqoop/pom.xml index d5afa37..11727de 100644 --- a/sharelib/sqoop/pom.xml +++ b/sharelib/sqoop/pom.xml @@ -214,11 +214,6 @@ <artifactId>oozie-sharelib-oozie</artifactId> <scope>provided</scope> </dependency> - <dependency> - <groupId>org.apache.oozie</groupId> - <artifactId>oozie-hadoop-utils</artifactId> - <scope>provided</scope> - </dependency> </dependencies> <build> @@ -244,18 +239,6 @@ <outputFile>${project.build.directory}/classpath</outputFile> </configuration> </execution> - <execution> - <id>create-mrapp-generated-classpath</id> - <phase>generate-test-resources</phase> - <goals> - <goal>build-classpath</goal> - </goals> - <configuration> - <!-- needed to run the unit test for DS to generate the required classpath - that is required in the env of the launch container in the mini mr/yarn cluster --> - <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile> - </configuration> - </execution> </executions> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java ---------------------------------------------------------------------- diff --git a/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java b/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java index 6672ffb..416f1ec 100644 --- a/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java +++ b/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java @@ -152,13 +152,13 @@ public class SqoopMain extends LauncherMain { Configuration sqoopConf = setUpSqoopSite(); String logFile = setUpSqoopLog4J(sqoopConf); - String[] sqoopArgs = MapReduceMain.getStrings(sqoopConf, SqoopActionExecutor.SQOOP_ARGS); + String[] sqoopArgs = ActionUtils.getStrings(sqoopConf, SqoopActionExecutor.SQOOP_ARGS); if (sqoopArgs == null) { throw new RuntimeException("Action Configuration does not have [" + SqoopActionExecutor.SQOOP_ARGS + "] property"); } - LauncherMapper.printArgs("Sqoop command arguments :", sqoopArgs); - LauncherMainHadoopUtils.killChildYarnJobs(sqoopConf); + printArgs("Sqoop command arguments :", sqoopArgs); + LauncherMain.killChildYarnJobs(sqoopConf); System.out.println("================================================================="); System.out.println(); @@ -169,13 +169,6 @@ public class SqoopMain extends LauncherMain { try { runSqoopJob(sqoopArgs); } - catch (SecurityException ex) { - if (LauncherSecurityManager.getExitInvoked()) { - if (LauncherSecurityManager.getExitCode() != 0) { - throw ex; - } - } - } finally { System.out.println("\n<<< Invocation of Sqoop command completed <<<\n"); writeExternalChildIDs(logFile, SQOOP_JOB_IDS_PATTERNS, "Sqoop"); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java b/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java index 3dfd606..edfe0c7 100644 --- a/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java +++ b/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java @@ -22,21 +22,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.service.HadoopAccessorService; -import org.apache.oozie.service.Services; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.util.XmlUtils; -import org.jdom.Element; -import org.jdom.Namespace; import java.io.BufferedReader; import java.io.File; @@ -44,7 +35,6 @@ import java.io.FileInputStream; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; -import java.io.StringReader; import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; @@ -209,17 +199,12 @@ public class TestSqoopActionExecutor extends ActionExecutorTestCase { createDB(); Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); SqoopActionExecutor ae = new SqoopActionExecutor(); ae.check(context, context.getAction()); @@ -248,17 +233,11 @@ public class TestSqoopActionExecutor extends ActionExecutorTestCase { createDB(); Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); SqoopActionExecutor ae = new SqoopActionExecutor(); ae.check(context, context.getAction()); @@ -289,17 +268,11 @@ public class TestSqoopActionExecutor extends ActionExecutorTestCase { createDB(); Context context = createContext(getActionXmlEval()); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); SqoopActionExecutor ae = new SqoopActionExecutor(); ae.check(context, context.getAction()); @@ -341,17 +314,11 @@ public class TestSqoopActionExecutor extends ActionExecutorTestCase { createDB(); Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); SqoopActionExecutor ae = new SqoopActionExecutor(); ae.check(context, context.getAction()); @@ -387,7 +354,7 @@ public class TestSqoopActionExecutor extends ActionExecutorTestCase { } - private RunningJob submitAction(Context context) throws Exception { + private String submitAction(Context context) throws Exception { SqoopActionExecutor ae = new SqoopActionExecutor(); WorkflowAction action = context.getAction(); @@ -401,24 +368,7 @@ public class TestSqoopActionExecutor extends ActionExecutorTestCase { assertNotNull(jobId); assertNotNull(jobTracker); assertNotNull(consoleUrl); - Element e = XmlUtils.parseXml(action.getConf()); - Namespace ns = Namespace.getNamespace("uri:oozie:sqoop-action:0.1"); - XConfiguration conf = new XConfiguration( - new StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString())); - conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns)); - conf.set("fs.default.name", e.getChildTextTrim("name-node", ns)); - conf.set("user.name", context.getProtoActionConf().get("user.name")); - conf.set("mapreduce.framework.name", "yarn"); - conf.set("group.name", getTestGroup()); - - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - XConfiguration.copy(conf, jobConf); - String user = jobConf.get("user.name"); - String group = jobConf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + return jobId; } private Context createContext(String actionXml) throws Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/streaming/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/streaming/pom.xml b/sharelib/streaming/pom.xml index 4f73272..d65c396 100644 --- a/sharelib/streaming/pom.xml +++ b/sharelib/streaming/pom.xml @@ -62,6 +62,12 @@ </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java ---------------------------------------------------------------------- diff --git a/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java b/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java index 991bf7e..cc55166 100644 --- a/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java +++ b/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java @@ -56,12 +56,12 @@ public class StreamingMain extends MapReduceMain { if (value != null) { jobConf.set("stream.recordreader.class", value); } - String[] values = getStrings(actionConf, "oozie.streaming.record-reader-mapping"); + String[] values = ActionUtils.getStrings(actionConf, "oozie.streaming.record-reader-mapping"); for (String s : values) { String[] kv = s.split("="); jobConf.set("stream.recordreader." + kv[0], kv[1]); } - values = getStrings(actionConf, "oozie.streaming.env"); + values = ActionUtils.getStrings(actionConf, "oozie.streaming.env"); value = jobConf.get("stream.addenvironment", ""); if (value.length() > 0) { value = value + " "; http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java index 5f9f38e..045f174 100644 --- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java +++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java @@ -18,46 +18,17 @@ package org.apache.oozie.action.hadoop; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.streaming.StreamJob; -import org.apache.oozie.WorkflowActionBean; -import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.client.WorkflowAction.Status; -import org.apache.oozie.command.wf.StartXCommand; -import org.apache.oozie.command.wf.SubmitXCommand; -import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; -import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; -import org.apache.oozie.service.WorkflowAppService; -import org.apache.oozie.service.Services; -import org.apache.oozie.service.HadoopAccessorService; -import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.util.XmlUtils; -import org.apache.oozie.util.IOUtils; -import org.apache.oozie.util.ClassUtils; -import org.jdom.Element; - import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.FileWriter; import java.io.IOException; -import java.io.OutputStream; import java.io.InputStream; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.Writer; +import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.StringReader; +import java.io.Writer; import java.net.URI; import java.util.Arrays; import java.util.List; @@ -68,14 +39,45 @@ import java.util.jar.JarOutputStream; import java.util.regex.Pattern; import java.util.zip.ZipEntry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.streaming.StreamJob; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.ActionExecutorException; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.client.WorkflowAction.Status; +import org.apache.oozie.command.wf.StartXCommand; +import org.apache.oozie.command.wf.SubmitXCommand; +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.util.ClassUtils; +import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.PropertiesUtils; +import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XmlUtils; +import org.jdom.Element; public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + private static final String PIPES = "pipes"; + private static final String MAP_REDUCE = "map-reduce"; + @Override protected void setSystemProps() throws Exception { super.setSystemProps(); @@ -212,10 +214,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertEquals("global-output-dir", actionConf.get("outputDir")); } - @SuppressWarnings("unchecked") public void testSetupMethods() throws Exception { MapReduceActionExecutor ae = new MapReduceActionExecutor(); - assertEquals(Arrays.asList(StreamingMain.class), ae.getLauncherClasses()); + List<Class<?>> classes = Arrays.<Class<?>>asList(StreamingMain.class); + assertEquals(classes, ae.getLauncherClasses()); Element actionXml = XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" + "<configuration>" @@ -226,7 +228,6 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { XConfiguration protoConf = new XConfiguration(); protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); - WorkflowJobBean wf = createBaseWorkflow(protoConf, "mr-action"); WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); action.setType(ae.getType()); @@ -236,7 +237,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Configuration conf = ae.createBaseHadoopConf(context, actionXml); ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir()); assertEquals("IN", conf.get("mapred.input.dir")); - JobConf launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf); + Configuration launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf); assertEquals(false, launcherJobConf.getBoolean("mapreduce.job.complete.cancel.delegation.tokens", true)); assertEquals(true, conf.getBoolean("mapreduce.job.complete.cancel.delegation.tokens", false)); @@ -248,45 +249,36 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { actionXml = createUberJarActionXML(getNameNodeUri() + "/app/job.jar", ""); conf = ae.createBaseHadoopConf(context, actionXml); ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir()); - assertEquals(getNameNodeUri() + "/app/job.jar", conf.get("oozie.mapreduce.uber.jar")); // absolute path with namenode - launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf); - assertEquals(getNameNodeUri() + "/app/job.jar", launcherJobConf.getJar()); // same for launcher conf + // absolute path with namenode + assertEquals(getNameNodeUri() + "/app/job.jar", conf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR)); actionXml = createUberJarActionXML("/app/job.jar", ""); conf = ae.createBaseHadoopConf(context, actionXml); ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir()); - assertEquals(getNameNodeUri() + "/app/job.jar", conf.get("oozie.mapreduce.uber.jar")); // absolute path without namenode - launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf); - assertEquals(getNameNodeUri() + "/app/job.jar", launcherJobConf.getJar()); // same for launcher conf + // absolute path without namenode + assertEquals(getNameNodeUri() + "/app/job.jar", conf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR)); actionXml = createUberJarActionXML("job.jar", ""); conf = ae.createBaseHadoopConf(context, actionXml); ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir()); - assertEquals(getFsTestCaseDir() + "/job.jar", conf.get("oozie.mapreduce.uber.jar")); // relative path - launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf); - assertEquals(getFsTestCaseDir() + "/job.jar", launcherJobConf.getJar()); // same for launcher + assertEquals(getFsTestCaseDir() + "/job.jar", conf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR)); // relative path actionXml = createUberJarActionXML("job.jar", "<streaming></streaming>"); conf = ae.createBaseHadoopConf(context, actionXml); ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir()); - assertEquals("", conf.get("oozie.mapreduce.uber.jar")); // ignored for streaming - launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf); - assertNull(launcherJobConf.getJar()); // same for launcher conf (not set) + // ignored for streaming + assertEquals("", conf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR)); actionXml = createUberJarActionXML("job.jar", "<pipes></pipes>"); conf = ae.createBaseHadoopConf(context, actionXml); ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir()); assertEquals("", conf.get("oozie.mapreduce.uber.jar")); // ignored for pipes - launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf); - assertNull(launcherJobConf.getJar()); // same for launcher conf (not set) actionXml = XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" + "</map-reduce>"); conf = ae.createBaseHadoopConf(context, actionXml); ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir()); assertNull(conf.get("oozie.mapreduce.uber.jar")); // doesn't resolve if not set - launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf); - assertNull(launcherJobConf.getJar()); // same for launcher conf // Disable uber jars to test that MapReduceActionExecutor won't allow the oozie.mapreduce.uber.jar property serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", false); @@ -386,7 +378,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { return new Context(wf, action); } - protected RunningJob submitAction(Context context) throws Exception { + protected String submitAction(Context context) throws Exception { MapReduceActionExecutor ae = new MapReduceActionExecutor(); WorkflowAction action = context.getAction(); @@ -394,54 +386,25 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { ae.prepareActionDir(getFileSystem(), context); ae.submitLauncher(getFileSystem(), context, action); - String jobId = action.getExternalId(); - String jobTracker = action.getTrackerUri(); - String consoleUrl = action.getConsoleUrl(); - assertNotNull(jobId); - assertNotNull(jobTracker); - assertNotNull(consoleUrl); - - Element e = XmlUtils.parseXml(action.getConf()); - XConfiguration conf = new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration")) - .toString())); - conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker")); - conf.set("fs.default.name", e.getChildTextTrim("name-node")); - conf.set("user.name", context.getProtoActionConf().get("user.name")); - conf.set("group.name", getTestGroup()); - - conf.set("mapreduce.framework.name", "yarn"); - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - XConfiguration.copy(conf, jobConf); - String user = jobConf.get("user.name"); - String group = jobConf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + return context.getAction().getExternalId(); } private String _testSubmit(String name, String actionXml) throws Exception { Context context = createContext(name, actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 2000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); MapReduceActionExecutor ae = new MapReduceActionExecutor(); ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); - JobConf conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); + Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); String user = conf.get("user.name"); - String group = conf.get("group.name"); JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); @@ -453,7 +416,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(mrJob.isSuccessful()); ae.check(context, context.getAction()); - assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); assertNull(context.getAction().getData()); ae.end(context, context.getAction()); @@ -471,28 +434,37 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { return mrJob.getID().toString(); } + private void _testSubmitError(String actionXml, String errorMessage) throws Exception { + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + + MapReduceActionExecutor ae = new MapReduceActionExecutor(); + ae.check(context, context.getAction()); + + assertEquals(JavaActionExecutor.FAILED_KILLED, context.getAction().getExternalStatus()); + + ae.end(context, context.getAction()); + assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus()); + assertTrue(context.getAction().getErrorMessage().contains("already exists")); + } + private void _testSubmitWithCredentials(String name, String actionXml) throws Exception { - Context context = createContextWithCredentials("map-reduce", actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Context context = createContextWithCredentials(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); MapReduceActionExecutor ae = new MapReduceActionExecutor(); ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); - JobConf conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); + Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); String user = conf.get("user.name"); - String group = conf.get("group.name"); JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); @@ -504,7 +476,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(mrJob.isSuccessful()); ae.check(context, context.getAction()); - assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); assertNull(context.getAction().getData()); ae.end(context, context.getAction()); @@ -513,6 +485,12 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(MapperReducerCredentialsForTest.hasCredentials(mrJob)); } + protected XConfiguration getSleepMapReduceConfig(String inputDir, String outputDir) { + XConfiguration conf = getMapReduceConfig(inputDir, outputDir); + conf.set("mapred.mapper.class", BlockingMapper.class.getName()); + return conf; + } + protected XConfiguration getMapReduceConfig(String inputDir, String outputDir) { XConfiguration conf = new XConfiguration(); conf.set("mapred.mapper.class", MapperReducerForTest.class.getName()); @@ -555,7 +533,37 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" + getMapReduceConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>"; - _testSubmit("map-reduce", actionXml); + _testSubmit(MAP_REDUCE, actionXml); + } + + public void testMapReduceActionError() throws Exception { + FileSystem fs = getFileSystem(); + + Path inputDir = new Path(getFsTestCaseDir(), "input"); + Path outputDir = new Path(getFsTestCaseDir(), "output1"); + + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, "data.txt"))); + ow.write("dummy\n"); + ow.write("dummy\n"); + ow.close(); + + String actionXml = "<map-reduce>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<configuration>" + + "<property><name>mapred.mapper.class</name><value>" + MapperReducerForTest.class.getName() + + "</value></property>" + + "<property><name>mapred.reducer.class</name><value>" + MapperReducerForTest.class.getName() + + "</value></property>" + + "<property><name>mapred.input.dir</name><value>" + inputDir + "</value></property>" + + "<property><name>mapred.output.dir</name><value>" + outputDir + "</value></property>" + + "</configuration>" + + "</map-reduce>"; + + _testSubmitError(actionXml, "already exists"); } public void testMapReduceWithConfigClass() throws Exception { @@ -569,7 +577,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { w.write("dummy\n"); w.close(); - Path jobXml = new Path(getFsTestCaseDir(), "job.xml"); + Path jobXml = new Path(getFsTestCaseDir(), "action.xml"); XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString()); conf.set(MapperReducerForTest.JOB_XML_OUTPUT_LOCATION, jobXml.toUri().toString()); conf.set("B", "b"); @@ -578,9 +586,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + conf.toXmlString(false) + "<config-class>" + OozieActionConfiguratorForTest.class.getName() + "</config-class>" + "</map-reduce>"; - _testSubmit("map-reduce", actionXml); + _testSubmit(MAP_REDUCE, actionXml); Configuration conf2 = new Configuration(false); conf2.addResource(fs.open(jobXml)); + assertEquals("a", conf2.get("A")); assertEquals("c", conf2.get("B")); } @@ -601,18 +610,11 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + getMapReduceConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "<config-class>org.apache.oozie.does.not.exist</config-class>" + "</map-reduce>"; - Context context = createContext("map-reduce", actionXml); - final RunningJob launcherJob = submitAction(context); - waitFor(120 * 2000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - assertFalse(LauncherMapperHelper.isMainSuccessful(launcherJob)); + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); - final Map<String, String> actionData = LauncherMapperHelper.getActionData(fs, context.getActionDir(), + final Map<String, String> actionData = LauncherHelper.getActionData(fs, context.getActionDir(), context.getProtoActionConf()); Properties errorProps = PropertiesUtils.stringToProperties(actionData.get(LauncherMapper.ACTION_DATA_ERROR_PROPS)); assertEquals("An Exception occurred while instantiating the action config class", @@ -638,24 +640,55 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + conf.toXmlString(false) + "<config-class>" + OozieActionConfiguratorForTest.class.getName() + "</config-class>" + "</map-reduce>"; - Context context = createContext("map-reduce", actionXml); - final RunningJob launcherJob = submitAction(context); - waitFor(120 * 2000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - assertFalse(LauncherMapperHelper.isMainSuccessful(launcherJob)); + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); - final Map<String, String> actionData = LauncherMapperHelper.getActionData(fs, context.getActionDir(), + final Map<String, String> actionData = LauncherHelper.getActionData(fs, context.getActionDir(), context.getProtoActionConf()); Properties errorProps = PropertiesUtils.stringToProperties(actionData.get(LauncherMapper.ACTION_DATA_ERROR_PROPS)); assertEquals("doh", errorProps.getProperty("exception.message")); assertTrue(errorProps.getProperty("exception.stacktrace").startsWith(OozieActionConfiguratorException.class.getName())); } + public void testMapReduceActionKill() throws Exception { + FileSystem fs = getFileSystem(); + + Path inputDir = new Path(getFsTestCaseDir(), "input"); + Path outputDir = new Path(getFsTestCaseDir(), "output"); + + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); + + String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + + getNameNodeUri() + "</name-node>" + + getSleepMapReduceConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>"; + + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + // wait until LauncherAM terminates - the MR job keeps running the background + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + + MapReduceActionExecutor mae = new MapReduceActionExecutor(); + mae.check(context, context.getAction()); // must be called so that externalChildIDs are read from HDFS + Configuration conf = mae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); + String user = conf.get("user.name"); + JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); + final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); + + mae.kill(context, context.getAction()); + + waitFor(10_000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return mrJob.isComplete(); + } + }); + assertEquals(JobStatus.State.KILLED, mrJob.getJobStatus().getState()); + } + public void testMapReduceWithCredentials() throws Exception { FileSystem fs = getFileSystem(); @@ -671,7 +704,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + getNameNodeUri() + "</name-node>" + getMapReduceCredentialsConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>"; - _testSubmitWithCredentials("map-reduce", actionXml); + _testSubmitWithCredentials(MAP_REDUCE, actionXml); } protected Path createAndUploadUberJar() throws Exception { @@ -734,7 +767,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" + getMapReduceUberJarConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>"; - String jobID = _testSubmit("map-reduce", actionXml); + String jobID = _testSubmit(MAP_REDUCE, actionXml); boolean containsLib1Jar = false; String lib1JarStr = "jobcache/" + jobID + "/jars/lib/lib1.jar"; @@ -914,7 +947,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + "#wordcount-simple" + "</program>" + " </pipes>" + getPipesConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "<file>" + programPath + "</file>" + "</map-reduce>"; - _testSubmit("pipes", actionXml); + _testSubmit(PIPES, actionXml); } else { System.out.println( @@ -948,22 +981,16 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + getOozieActionExternalStatsWriteProperty(inputDir.toString(), outputDir.toString(), "true") .toXmlString(false) + "</map-reduce>"; - Context context = createContext("map-reduce", actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); MapReduceActionExecutor ae = new MapReduceActionExecutor(); - JobConf conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); + Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); @@ -981,7 +1008,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(mrJob.isSuccessful()); ae.check(context, context.getAction()); - assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); assertNull(context.getAction().getData()); ae.end(context, context.getAction()); @@ -1026,24 +1053,19 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + getOozieActionExternalStatsWriteProperty(inputDir.toString(), outputDir.toString(), "false") .toXmlString(false) + "</map-reduce>"; - Context context = createContext("map-reduce", actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 2000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); MapReduceActionExecutor ae = new MapReduceActionExecutor(); ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); - JobConf conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); + Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); String user = conf.get("user.name"); String group = conf.get("group.name"); JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); @@ -1057,7 +1079,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(mrJob.isSuccessful()); ae.check(context, context.getAction()); - assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); assertNull(context.getAction().getData()); ae.end(context, context.getAction()); @@ -1098,24 +1120,19 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + getOozieActionExternalStatsWriteProperty(inputDir.toString(), outputDir.toString(), "false") .toXmlString(false) + "</map-reduce>"; - Context context = createContext("map-reduce", actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 2000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); MapReduceActionExecutor ae = new MapReduceActionExecutor(); ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); - JobConf conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); + Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); String user = conf.get("user.name"); String group = conf.get("group.name"); JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); @@ -1129,7 +1146,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(mrJob.isSuccessful()); ae.check(context, context.getAction()); - assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); assertNull(context.getAction().getData()); actionXml = "<map-reduce>" @@ -1185,35 +1202,24 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { .append(mrConfig.toXmlString(false)).append("</map-reduce>"); String actionXml = sb.toString(); - Context context = createContext("map-reduce", actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 2000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); - assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); - // Assert launcher job name has been set - System.out.println("Launcher job name: " + launcherJob.getJobName()); - assertTrue(launcherJob.getJobName().equals(launcherJobName)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); MapReduceActionExecutor ae = new MapReduceActionExecutor(); ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); - JobConf conf = ae.createBaseHadoopConf(context, - XmlUtils.parseXml(actionXml)); + Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); String user = conf.get("user.name"); JobClient jobClient = Services.get().get(HadoopAccessorService.class) .createJobClient(user, conf); - final RunningJob mrJob = jobClient.getJob(JobID.forName(context - .getAction().getExternalChildIDs())); + final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); waitFor(120 * 1000, new Predicate() { public boolean evaluate() throws Exception { @@ -1223,7 +1229,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(mrJob.isSuccessful()); ae.check(context, context.getAction()); - assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); assertNull(context.getAction().getData()); ae.end(context, context.getAction()); @@ -1304,7 +1310,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Element eActionXml = XmlUtils.parseXml(actionXml); - Context context = createContext("map-reduce", actionXml); + Context context = createContext(MAP_REDUCE, actionXml); Path appPath = getAppPath(); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java index 9aa4cb6..4e8bb4b 100644 --- a/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java +++ b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java @@ -47,6 +47,8 @@ import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.Services; import org.apache.oozie.service.WorkflowAppService; +import com.google.common.base.Preconditions; + public class OozieSharelibCLI { public static final String[] HELP_INFO = { "", @@ -255,20 +257,27 @@ public class OozieSharelibCLI { private List<Future<Void>> copyFolderRecursively(final FileSystem fs, final ExecutorService threadPool, File srcFile, final Path dstPath) throws IOException { List<Future<Void>> taskList = new ArrayList<Future<Void>>(); - for (final File file : srcFile.listFiles()) { - final Path trgName = new Path(dstPath, file.getName()); - if (file.isDirectory()) { - taskList.addAll(copyFolderRecursively(fs, threadPool, file, trgName)); - } else { - taskList.add(threadPool.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - fs.copyFromLocalFile(new Path(file.toURI()), trgName); - return null; - } - })); + File[] files = srcFile.listFiles(); + + if (files != null) { + for (final File file : files) { + final Path trgName = new Path(dstPath, file.getName()); + if (file.isDirectory()) { + taskList.addAll(copyFolderRecursively(fs, threadPool, file, trgName)); + } else { + taskList.add(threadPool.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + fs.copyFromLocalFile(new Path(file.toURI()), trgName); + return null; + } + })); + } } + } else { + System.out.println("WARNING: directory listing of " + srcFile.getAbsolutePath().toString() + " returned null"); } + return taskList; } } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/webapp/pom.xml ---------------------------------------------------------------------- diff --git a/webapp/pom.xml b/webapp/pom.xml index e4fdfb7..4dc0c30 100644 --- a/webapp/pom.xml +++ b/webapp/pom.xml @@ -37,8 +37,8 @@ <artifactId>oozie-client</artifactId> <exclusions> <exclusion> - <groupId>org.apache.oozie</groupId> - <artifactId>oozie-hadoop-auth</artifactId> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> @@ -273,8 +273,8 @@ <scope>compile</scope> </dependency> <dependency> - <groupId>org.apache.oozie</groupId> - <artifactId>oozie-hadoop-auth</artifactId> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> <scope>compile</scope> </dependency> <dependency>