Repository: oozie
Updated Branches:
  refs/heads/master 53b1d1e43 -> 21761f5b5


http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
 
b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
index 25092ce..7c3c5bb 100644
--- 
a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
+++ 
b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
@@ -18,44 +18,36 @@
 
 package org.apache.oozie.action.hadoop;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.fs.Path;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.service.ConfigurationService;
-import org.apache.oozie.service.URIHandlerService;
-import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
-import org.apache.oozie.util.IOUtils;
-import org.codehaus.jackson.JsonParser;
 import org.jdom.Element;
 import org.json.simple.JSONValue;
 import org.json.simple.parser.JSONParser;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.InputStream;
-import java.io.FileInputStream;
-import java.io.Writer;
-import java.io.OutputStreamWriter;
-import java.io.StringReader;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 public class TestPigActionExecutor extends ActionExecutorTestCase {
 
     private static final String PIG_SCRIPT = "set job.name 'test'\n" + "set 
debug on\n" +
@@ -144,49 +136,21 @@ public class TestPigActionExecutor extends 
ActionExecutorTestCase {
         return new Context(wf, action);
     }
 
-    private RunningJob submitAction(Context context) throws Exception {
+    private String submitAction(Context context) throws Exception {
         PigActionExecutor ae = new PigActionExecutor();
-
         WorkflowAction action = context.getAction();
-
         ae.prepareActionDir(getFileSystem(), context);
         ae.submitLauncher(getFileSystem(), context, action);
-
         String jobId = action.getExternalId();
-        String jobTracker = action.getTrackerUri();
-        String consoleUrl = action.getConsoleUrl();
-        assertNotNull(jobId);
-        assertNotNull(jobTracker);
-        assertNotNull(consoleUrl);
-
-        Element e = XmlUtils.parseXml(action.getConf());
-        XConfiguration conf =
-                new XConfiguration(new 
StringReader(XmlUtils.prettyPrint(e.getChild("configuration")).toString()));
-        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker"));
-        conf.set("fs.default.name", e.getChildTextTrim("name-node"));
-        conf.set("mapreduce.framework.name", "yarn");
-        conf.set("user.name", context.getProtoActionConf().get("user.name"));
-        conf.set("group.name", getTestGroup());
-
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        XConfiguration.copy(conf, jobConf);
-        String user = jobConf.get("user.name");
-        String group = jobConf.get("group.name");
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+
+        return jobId;
     }
 
     private void _testSubmit(String actionXml, boolean checkForSuccess) throws 
Exception {
 
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        evaluateLauncherJob(launcherJob);
-        assertTrue(launcherJob.isSuccessful());
-
-        sleep(2000);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         PigActionExecutor ae = new PigActionExecutor();
         ae.check(context, context.getAction());
@@ -222,26 +186,25 @@ public class TestPigActionExecutor extends 
ActionExecutorTestCase {
         // Set the action xml with the option for retrieving stats to true
         String actionXml = setPigActionXml(PIG_SCRIPT, true);
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        evaluateLauncherJob(launcherJob);
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         Configuration conf = new XConfiguration();
         conf.set("user.name", getTestUser());
-        Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        Map<String, String> actionData = 
LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-        assertTrue(LauncherMapperHelper.hasStatsData(actionData));
+        assertTrue(LauncherHelper.hasStatsData(actionData));
 
         PigActionExecutor ae = new PigActionExecutor();
         WorkflowAction wfAction = context.getAction();
         ae.check(context, wfAction);
         ae.end(context, wfAction);
 
-        assertEquals("SUCCEEDED", wfAction.getExternalStatus());
+        assertEquals(JavaActionExecutor.SUCCEEDED, 
wfAction.getExternalStatus());
         String stats = wfAction.getStats();
         assertNotNull(stats);
         // check for some of the expected key values in the stats
-        Map m = (Map)JSONValue.parse(stats);
+        Map m = (Map) JSONValue.parse(stats);
         // check for expected 1st level JSON keys
         assertTrue(m.containsKey("PIG_VERSION"));
 
@@ -249,7 +212,7 @@ public class TestPigActionExecutor extends 
ActionExecutorTestCase {
         String[] childIDs = expectedChildIDs.split(",");
         assertTrue(m.containsKey(childIDs[0]));
 
-        Map q = (Map)m.get(childIDs[0]);
+        Map q = (Map) m.get(childIDs[0]);
         // check for expected 2nd level JSON keys
         assertTrue(q.containsKey("HADOOP_COUNTERS"));
     }
@@ -275,9 +238,8 @@ public class TestPigActionExecutor extends 
ActionExecutorTestCase {
         // Set the action xml with the option for retrieving stats to false
         String actionXml = setPigActionXml(PIG_SCRIPT, false);
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        evaluateLauncherJob(launcherJob);
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         PigActionExecutor ae = new PigActionExecutor();
         WorkflowAction wfAction = context.getAction();
@@ -305,9 +267,8 @@ public class TestPigActionExecutor extends 
ActionExecutorTestCase {
         // Set the action xml with the option for retrieving stats to true
         String actionXml = setPigActionXml(PIG_SCRIPT, true);
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        evaluateLauncherJob(launcherJob);
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         PigActionExecutor ae = new PigActionExecutor();
         WorkflowAction wfAction = context.getAction();
@@ -327,15 +288,14 @@ public class TestPigActionExecutor extends 
ActionExecutorTestCase {
         // Set the action xml with the option for retrieving stats to false
         String actionXml = setPigActionXml(PIG_SCRIPT, false);
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        evaluateLauncherJob(launcherJob);
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         Configuration conf = new XConfiguration();
         conf.set("user.name", getTestUser());
-        Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        Map<String, String> actionData = 
LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-        assertFalse(LauncherMapperHelper.hasStatsData(actionData));
+        assertFalse(LauncherHelper.hasStatsData(actionData));
 
         PigActionExecutor ae = new PigActionExecutor();
         WorkflowAction wfAction = context.getAction();
@@ -346,16 +306,6 @@ public class TestPigActionExecutor extends 
ActionExecutorTestCase {
         assertNotNull(wfAction.getExternalChildIDs());
     }
 
-    private void evaluateLauncherJob(final RunningJob launcherJob) throws 
Exception{
-        waitFor(180 * 1000, new Predicate() {
-            @Override
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        sleep(2000);
-    }
-
     protected XConfiguration setPigConfig(boolean writeStats) {
         XConfiguration conf = new XConfiguration();
         conf.set("oozie.pig.log.level", "INFO");

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigMainWithOldAPI.java
----------------------------------------------------------------------
diff --git 
a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigMainWithOldAPI.java
 
b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigMainWithOldAPI.java
index e52e6fd..74de433 100644
--- 
a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigMainWithOldAPI.java
+++ 
b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigMainWithOldAPI.java
@@ -21,7 +21,6 @@ package org.apache.oozie.action.hadoop;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.action.hadoop.MainTestCase;
-import org.apache.oozie.action.hadoop.MapReduceMain;
 import org.apache.oozie.action.hadoop.PigMainWithOldAPI;
 import org.apache.oozie.action.hadoop.SharelibUtils;
 import org.apache.oozie.test.XFsTestCase;
@@ -97,9 +96,9 @@ public class TestPigMainWithOldAPI extends XFsTestCase 
implements Callable<Void>
         SharelibUtils.addToDistributedCache("pig", fs, getFsTestCaseDir(), 
jobConfiguration);
 
         String[] params = { "IN=" + inputDir.toUri().getPath(), "OUT=" + 
outputDir.toUri().getPath() };
-        MapReduceMain.setStrings(jobConfiguration, "oozie.pig.params", params);
+        ActionUtils.setStrings(jobConfiguration, "oozie.pig.params", params);
         String[] args = { "-v" };
-        MapReduceMain.setStrings(jobConfiguration, "oozie.pig.args", args);
+        ActionUtils.setStrings(jobConfiguration, "oozie.pig.args", args);
 
         File actionXml = new File(getTestCaseDir(), "action.xml");
         OutputStream os = new FileOutputStream(actionXml);

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/spark/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml
index abef24f..dec505b 100644
--- a/sharelib/spark/pom.xml
+++ b/sharelib/spark/pom.xml
@@ -248,6 +248,103 @@
             <artifactId>oozie-examples</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-yarn_${spark.scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-client</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-server-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-app</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-annotations</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-aws</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-beeline</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-exec</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-jdbc</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-metastore</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-serde</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-service</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.spark-project.hive</groupId>
+                    <artifactId>hive-shims</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <build>
@@ -273,37 +370,6 @@
                             
<outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to 
generate the required classpath
-                            that is required in the env of the launch 
container in the mini mr/yarn cluster -->
-                            
<outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-antrun-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <configuration>
-                            <target>
-                                <!-- needed to include Main class in classpath 
for mini yarn cluster for unit tests -->
-                                <echo 
file="${project.build.directory}/test-classes/mrapp-generated-classpath"
-                                      append="true" 
message=":${project.build.directory}/classes"/>
-                            </target>
-                        </configuration>
-                        <goals>
-                            <goal>run</goal>
-                        </goals>
-                        <phase>generate-test-resources</phase>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>
@@ -319,109 +385,5 @@
             </plugin>
         </plugins>
     </build>
-
-    <profiles>
-        <profile>
-            <id>hadoop-2</id>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.spark</groupId>
-                    
<artifactId>spark-yarn_${spark.scala.binary.version}</artifactId>
-                    <version>${spark.version}</version>
-                    <scope>compile</scope>
-                    <exclusions>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            <artifactId>hadoop-yarn-client</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            <artifactId>hadoop-yarn-api</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            <artifactId>hadoop-yarn-common</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            <artifactId>hadoop-yarn-server-common</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            
<artifactId>hadoop-mapreduce-client-shuffle</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            
<artifactId>hadoop-mapreduce-client-core</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            
<artifactId>hadoop-mapreduce-client-common</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            
<artifactId>hadoop-mapreduce-client-app</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            <artifactId>hadoop-annotations</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            <artifactId>hadoop-auth</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            <artifactId>hadoop-aws</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            <artifactId>hadoop-client</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.spark-project.hive</groupId>
-                            <artifactId>hive-beeline</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.spark-project.hive</groupId>
-                            <artifactId>hive-common</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.spark-project.hive</groupId>
-                            <artifactId>hive-exec</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.spark-project.hive</groupId>
-                            <artifactId>hive-jdbc</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.spark-project.hive</groupId>
-                            <artifactId>hive-metastore</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.spark-project.hive</groupId>
-                            <artifactId>hive-serde</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.spark-project.hive</groupId>
-                            <artifactId>hive-service</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.spark-project.hive</groupId>
-                            <artifactId>hive-shims</artifactId>
-                        </exclusion>
-                    </exclusions>
-                </dependency>
-            </dependencies>
-        </profile>
-    </profiles>
 </project>
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java 
b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index ffa934a..68f7a60 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -91,7 +91,7 @@ public class SparkMain extends LauncherMain {
         prepareHadoopConfig(actionConf);
 
         setYarnTag(actionConf);
-        LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
+        LauncherMain.killChildYarnJobs(actionConf);
         String logFile = setUpSparkLog4J(actionConf);
         setHiveSite(actionConf);
         List<String> sparkArgs = new ArrayList<String>();
@@ -361,11 +361,16 @@ public class SparkMain extends LauncherMain {
      */
     private static File getMatchingFile(Pattern fileNamePattern) throws 
OozieActionConfiguratorException {
         File localDir = new File(".");
-        for(String fileName : localDir.list()){
-            if(fileNamePattern.matcher(fileName).find()){
-                return new File(fileName);
+        String[] files = localDir.list();
+
+        if (files != null) {
+            for(String fileName : files){
+                if(fileNamePattern.matcher(fileName).find()){
+                    return new File(fileName);
+                }
             }
         }
+
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java
index 458baaa..9d8d4aa 100644
--- 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java
+++ 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java
@@ -106,13 +106,8 @@ public class TestPyspark extends ActionExecutorTestCase {
             WorkflowAction.Status wfStatus)
             throws Exception {
         Context context = createContext(getActionXml(sparkOpts), wf);
-        final RunningJob launcherJob = submitAction(context);
-        waitFor(200 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
         SparkActionExecutor ae = new SparkActionExecutor();
         ae.check(context, context.getAction());
         assertEquals(externalStatus, context.getAction().getExternalStatus());
@@ -120,7 +115,7 @@ public class TestPyspark extends ActionExecutorTestCase {
         assertEquals(wfStatus, context.getAction().getStatus());
     }
 
-    protected RunningJob submitAction(Context context) throws Exception {
+    protected String submitAction(Context context) throws Exception {
         SparkActionExecutor ae = new SparkActionExecutor();
         WorkflowAction action = context.getAction();
         ae.prepareActionDir(getFileSystem(), context);
@@ -131,12 +126,8 @@ public class TestPyspark extends ActionExecutorTestCase {
         assertNotNull(jobId);
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        jobConf.set("mapred.job.tracker", jobTracker);
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), 
jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+
+        return jobId;
     }
 
     protected Context createContext(String actionXml, WorkflowJobBean wf) 
throws Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
index 8c77be0..d97f1f0 100644
--- 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
+++ 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.WorkflowAction;
@@ -175,13 +176,8 @@ public class TestSparkActionExecutor extends 
ActionExecutorTestCase {
         scriptWriter.close();
 
         Context context = createContext(getActionXml());
-        final RunningJob launcherJob = submitAction(context);
-        waitFor(200 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherID = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherID);
 
         SparkActionExecutor ae = new SparkActionExecutor();
         ae.check(context, context.getAction());
@@ -212,7 +208,7 @@ public class TestSparkActionExecutor extends 
ActionExecutorTestCase {
         return new Context(wf, action);
     }
 
-    protected RunningJob submitAction(Context context) throws Exception {
+    protected String submitAction(Context context) throws Exception {
         SparkActionExecutor ae = new SparkActionExecutor();
 
         WorkflowAction action = context.getAction();
@@ -227,14 +223,7 @@ public class TestSparkActionExecutor extends 
ActionExecutorTestCase {
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
 
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        jobConf.set("mapred.job.tracker", jobTracker);
-
-        JobClient jobClient =
-                
Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), 
jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+        return jobId;
     }
 
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/sqoop/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/sqoop/pom.xml b/sharelib/sqoop/pom.xml
index d5afa37..11727de 100644
--- a/sharelib/sqoop/pom.xml
+++ b/sharelib/sqoop/pom.xml
@@ -214,11 +214,6 @@
             <artifactId>oozie-sharelib-oozie</artifactId>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.oozie</groupId>
-            <artifactId>oozie-hadoop-utils</artifactId>
-            <scope>provided</scope>
-        </dependency>
     </dependencies>
 
     <build>
@@ -244,18 +239,6 @@
                             
<outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to 
generate the required classpath
-                            that is required in the env of the launch 
container in the mini mr/yarn cluster -->
-                            
<outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java 
b/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
index 6672ffb..416f1ec 100644
--- a/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
+++ b/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
@@ -152,13 +152,13 @@ public class SqoopMain extends LauncherMain {
         Configuration sqoopConf = setUpSqoopSite();
         String logFile = setUpSqoopLog4J(sqoopConf);
 
-        String[] sqoopArgs = MapReduceMain.getStrings(sqoopConf, 
SqoopActionExecutor.SQOOP_ARGS);
+        String[] sqoopArgs = ActionUtils.getStrings(sqoopConf, 
SqoopActionExecutor.SQOOP_ARGS);
         if (sqoopArgs == null) {
             throw new RuntimeException("Action Configuration does not have [" 
+ SqoopActionExecutor.SQOOP_ARGS + "] property");
         }
 
-        LauncherMapper.printArgs("Sqoop command arguments :", sqoopArgs);
-        LauncherMainHadoopUtils.killChildYarnJobs(sqoopConf);
+        printArgs("Sqoop command arguments :", sqoopArgs);
+        LauncherMain.killChildYarnJobs(sqoopConf);
 
         
System.out.println("=================================================================");
         System.out.println();
@@ -169,13 +169,6 @@ public class SqoopMain extends LauncherMain {
         try {
             runSqoopJob(sqoopArgs);
         }
-        catch (SecurityException ex) {
-            if (LauncherSecurityManager.getExitInvoked()) {
-                if (LauncherSecurityManager.getExitCode() != 0) {
-                    throw ex;
-                }
-            }
-        }
         finally {
             System.out.println("\n<<< Invocation of Sqoop command completed 
<<<\n");
             writeExternalChildIDs(logFile, SQOOP_JOB_IDS_PATTERNS, "Sqoop");

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
 
b/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
index 3dfd606..edfe0c7 100644
--- 
a/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
+++ 
b/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
@@ -22,21 +22,12 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
-import org.jdom.Element;
-import org.jdom.Namespace;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -44,7 +35,6 @@ import java.io.FileInputStream;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.io.StringReader;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.Statement;
@@ -209,17 +199,12 @@ public class TestSqoopActionExecutor extends 
ActionExecutorTestCase {
         createDB();
 
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
+        Map<String, String> actionData = 
LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
-        assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
+        assertFalse(LauncherHelper.hasIdSwap(actionData));
 
         SqoopActionExecutor ae = new SqoopActionExecutor();
         ae.check(context, context.getAction());
@@ -248,17 +233,11 @@ public class TestSqoopActionExecutor extends 
ActionExecutorTestCase {
         createDB();
 
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+        Map<String, String> actionData = 
LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
-        assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
+        assertFalse(LauncherHelper.hasIdSwap(actionData));
 
         SqoopActionExecutor ae = new SqoopActionExecutor();
         ae.check(context, context.getAction());
@@ -289,17 +268,11 @@ public class TestSqoopActionExecutor extends 
ActionExecutorTestCase {
         createDB();
 
         Context context = createContext(getActionXmlEval());
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+        Map<String, String> actionData = 
LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
-        assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
+        assertFalse(LauncherHelper.hasIdSwap(actionData));
 
         SqoopActionExecutor ae = new SqoopActionExecutor();
         ae.check(context, context.getAction());
@@ -341,17 +314,11 @@ public class TestSqoopActionExecutor extends 
ActionExecutorTestCase {
         createDB();
 
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+        Map<String, String> actionData = 
LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
-        assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
+        assertFalse(LauncherHelper.hasIdSwap(actionData));
 
         SqoopActionExecutor ae = new SqoopActionExecutor();
         ae.check(context, context.getAction());
@@ -387,7 +354,7 @@ public class TestSqoopActionExecutor extends 
ActionExecutorTestCase {
     }
 
 
-    private RunningJob submitAction(Context context) throws Exception {
+    private String submitAction(Context context) throws Exception {
         SqoopActionExecutor ae = new SqoopActionExecutor();
 
         WorkflowAction action = context.getAction();
@@ -401,24 +368,7 @@ public class TestSqoopActionExecutor extends 
ActionExecutorTestCase {
         assertNotNull(jobId);
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
-        Element e = XmlUtils.parseXml(action.getConf());
-        Namespace ns = Namespace.getNamespace("uri:oozie:sqoop-action:0.1");
-        XConfiguration conf = new XConfiguration(
-                new 
StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString()));
-        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns));
-        conf.set("fs.default.name", e.getChildTextTrim("name-node", ns));
-        conf.set("user.name", context.getProtoActionConf().get("user.name"));
-        conf.set("mapreduce.framework.name", "yarn");
-        conf.set("group.name", getTestGroup());
-
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        XConfiguration.copy(conf, jobConf);
-        String user = jobConf.get("user.name");
-        String group = jobConf.get("group.name");
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+        return jobId;
     }
 
     private Context createContext(String actionXml) throws Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/streaming/pom.xml b/sharelib/streaming/pom.xml
index 4f73272..d65c396 100644
--- a/sharelib/streaming/pom.xml
+++ b/sharelib/streaming/pom.xml
@@ -62,6 +62,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
             <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java
 
b/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java
index 991bf7e..cc55166 100644
--- 
a/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java
+++ 
b/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java
@@ -56,12 +56,12 @@ public class StreamingMain extends MapReduceMain {
         if (value != null) {
             jobConf.set("stream.recordreader.class", value);
         }
-        String[] values = getStrings(actionConf, 
"oozie.streaming.record-reader-mapping");
+        String[] values = ActionUtils.getStrings(actionConf, 
"oozie.streaming.record-reader-mapping");
         for (String s : values) {
             String[] kv = s.split("=");
             jobConf.set("stream.recordreader." + kv[0], kv[1]);
         }
-        values = getStrings(actionConf, "oozie.streaming.env");
+        values = ActionUtils.getStrings(actionConf, "oozie.streaming.env");
         value = jobConf.get("stream.addenvironment", "");
         if (value.length() > 0) {
             value = value + " ";

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
 
b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
index 5f9f38e..045f174 100644
--- 
a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
+++ 
b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
@@ -18,46 +18,17 @@
 
 package org.apache.oozie.action.hadoop;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.streaming.StreamJob;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.client.WorkflowAction.Status;
-import org.apache.oozie.command.wf.StartXCommand;
-import org.apache.oozie.command.wf.SubmitXCommand;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
-import 
org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
-import org.apache.oozie.service.WorkflowAppService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
-import org.apache.oozie.util.IOUtils;
-import org.apache.oozie.util.ClassUtils;
-import org.jdom.Element;
-
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.InputStream;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.Writer;
+import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.StringReader;
+import java.io.Writer;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.List;
@@ -68,14 +39,45 @@ import java.util.jar.JarOutputStream;
 import java.util.regex.Pattern;
 import java.util.zip.ZipEntry;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.streaming.StreamJob;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowAction.Status;
+import org.apache.oozie.command.wf.StartXCommand;
+import org.apache.oozie.command.wf.SubmitXCommand;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import 
org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.ClassUtils;
+import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.PropertiesUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
 
 public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
 
+    private static final String PIPES = "pipes";
+    private static final String MAP_REDUCE = "map-reduce";
+
     @Override
     protected void setSystemProps() throws Exception {
         super.setSystemProps();
@@ -212,10 +214,10 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
          assertEquals("global-output-dir", actionConf.get("outputDir"));
     }
 
-    @SuppressWarnings("unchecked")
     public void testSetupMethods() throws Exception {
         MapReduceActionExecutor ae = new MapReduceActionExecutor();
-        assertEquals(Arrays.asList(StreamingMain.class), 
ae.getLauncherClasses());
+        List<Class<?>> classes = Arrays.<Class<?>>asList(StreamingMain.class);
+        assertEquals(classes, ae.getLauncherClasses());
 
         Element actionXml = XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" 
+ getJobTrackerUri() + "</job-tracker>"
                 + "<name-node>" + getNameNodeUri() + "</name-node>" + 
"<configuration>"
@@ -226,7 +228,6 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         XConfiguration protoConf = new XConfiguration();
         protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
 
-
         WorkflowJobBean wf = createBaseWorkflow(protoConf, "mr-action");
         WorkflowActionBean action = (WorkflowActionBean) 
wf.getActions().get(0);
         action.setType(ae.getType());
@@ -236,7 +237,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         Configuration conf = ae.createBaseHadoopConf(context, actionXml);
         ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
         assertEquals("IN", conf.get("mapred.input.dir"));
-        JobConf launcherJobConf = ae.createLauncherConf(getFileSystem(), 
context, action, actionXml, conf);
+        Configuration launcherJobConf = ae.createLauncherConf(getFileSystem(), 
context, action, actionXml, conf);
         assertEquals(false, 
launcherJobConf.getBoolean("mapreduce.job.complete.cancel.delegation.tokens", 
true));
         assertEquals(true, 
conf.getBoolean("mapreduce.job.complete.cancel.delegation.tokens", false));
 
@@ -248,45 +249,36 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         actionXml = createUberJarActionXML(getNameNodeUri() + "/app/job.jar", 
"");
         conf = ae.createBaseHadoopConf(context, actionXml);
         ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
-        assertEquals(getNameNodeUri() + "/app/job.jar", 
conf.get("oozie.mapreduce.uber.jar"));  // absolute path with namenode
-        launcherJobConf = ae.createLauncherConf(getFileSystem(), context, 
action, actionXml, conf);
-        assertEquals(getNameNodeUri() + "/app/job.jar", 
launcherJobConf.getJar());              // same for launcher conf
+        // absolute path with namenode
+        assertEquals(getNameNodeUri() + "/app/job.jar", 
conf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR));
 
         actionXml = createUberJarActionXML("/app/job.jar", "");
         conf = ae.createBaseHadoopConf(context, actionXml);
         ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
-        assertEquals(getNameNodeUri() + "/app/job.jar", 
conf.get("oozie.mapreduce.uber.jar"));  // absolute path without namenode
-        launcherJobConf = ae.createLauncherConf(getFileSystem(), context, 
action, actionXml, conf);
-        assertEquals(getNameNodeUri() + "/app/job.jar", 
launcherJobConf.getJar());              // same for launcher conf
+        // absolute path without namenode
+        assertEquals(getNameNodeUri() + "/app/job.jar", 
conf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR));
 
         actionXml = createUberJarActionXML("job.jar", "");
         conf = ae.createBaseHadoopConf(context, actionXml);
         ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
-        assertEquals(getFsTestCaseDir() + "/job.jar", 
conf.get("oozie.mapreduce.uber.jar"));    // relative path
-        launcherJobConf = ae.createLauncherConf(getFileSystem(), context, 
action, actionXml, conf);
-        assertEquals(getFsTestCaseDir() + "/job.jar", 
launcherJobConf.getJar());                // same for launcher
+        assertEquals(getFsTestCaseDir() + "/job.jar", 
conf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR)); // relative path
 
         actionXml = createUberJarActionXML("job.jar", 
"<streaming></streaming>");
         conf = ae.createBaseHadoopConf(context, actionXml);
         ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
-        assertEquals("", conf.get("oozie.mapreduce.uber.jar"));                
                 // ignored for streaming
-        launcherJobConf = ae.createLauncherConf(getFileSystem(), context, 
action, actionXml, conf);
-        assertNull(launcherJobConf.getJar());                                  
                 // same for launcher conf (not set)
+        // ignored for streaming
+        assertEquals("", conf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR));
 
         actionXml = createUberJarActionXML("job.jar", "<pipes></pipes>");
         conf = ae.createBaseHadoopConf(context, actionXml);
         ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
         assertEquals("", conf.get("oozie.mapreduce.uber.jar"));                
                 // ignored for pipes
-        launcherJobConf = ae.createLauncherConf(getFileSystem(), context, 
action, actionXml, conf);
-        assertNull(launcherJobConf.getJar());                                  
                 // same for launcher conf (not set)
 
         actionXml = XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + 
getJobTrackerUri() + "</job-tracker>"
                 + "<name-node>" + getNameNodeUri() + "</name-node>" + 
"</map-reduce>");
         conf = ae.createBaseHadoopConf(context, actionXml);
         ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
         assertNull(conf.get("oozie.mapreduce.uber.jar"));                      
                 // doesn't resolve if not set
-        launcherJobConf = ae.createLauncherConf(getFileSystem(), context, 
action, actionXml, conf);
-        assertNull(launcherJobConf.getJar());                                  
                 // same for launcher conf
 
         // Disable uber jars to test that MapReduceActionExecutor won't allow 
the oozie.mapreduce.uber.jar property
         serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", 
false);
@@ -386,7 +378,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         return new Context(wf, action);
     }
 
-    protected RunningJob submitAction(Context context) throws Exception {
+    protected String submitAction(Context context) throws Exception {
         MapReduceActionExecutor ae = new MapReduceActionExecutor();
 
         WorkflowAction action = context.getAction();
@@ -394,54 +386,25 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         ae.prepareActionDir(getFileSystem(), context);
         ae.submitLauncher(getFileSystem(), context, action);
 
-        String jobId = action.getExternalId();
-        String jobTracker = action.getTrackerUri();
-        String consoleUrl = action.getConsoleUrl();
-        assertNotNull(jobId);
-        assertNotNull(jobTracker);
-        assertNotNull(consoleUrl);
-
-        Element e = XmlUtils.parseXml(action.getConf());
-        XConfiguration conf = new XConfiguration(new 
StringReader(XmlUtils.prettyPrint(e.getChild("configuration"))
-                .toString()));
-        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker"));
-        conf.set("fs.default.name", e.getChildTextTrim("name-node"));
-        conf.set("user.name", context.getProtoActionConf().get("user.name"));
-        conf.set("group.name", getTestGroup());
-
-        conf.set("mapreduce.framework.name", "yarn");
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        XConfiguration.copy(conf, jobConf);
-        String user = jobConf.get("user.name");
-        String group = jobConf.get("group.name");
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+        return context.getAction().getExternalId();
     }
 
     private String _testSubmit(String name, String actionXml) throws Exception 
{
 
         Context context = createContext(name, actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 2000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
+        Map<String, String> actionData = 
LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
-        assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+        assertTrue(LauncherHelper.hasIdSwap(actionData));
 
         MapReduceActionExecutor ae = new MapReduceActionExecutor();
         ae.check(context, context.getAction());
         assertTrue(launcherId.equals(context.getAction().getExternalId()));
 
-        JobConf conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
+        Configuration conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
         String user = conf.get("user.name");
-        String group = conf.get("group.name");
         JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
         final RunningJob mrJob = 
jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs()));
 
@@ -453,7 +416,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         assertTrue(mrJob.isSuccessful());
         ae.check(context, context.getAction());
 
-        assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+        assertEquals(JavaActionExecutor.SUCCEEDED, 
context.getAction().getExternalStatus());
         assertNull(context.getAction().getData());
 
         ae.end(context, context.getAction());
@@ -471,28 +434,37 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         return mrJob.getID().toString();
     }
 
+    private void _testSubmitError(String actionXml, String errorMessage) 
throws Exception {
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
+        MapReduceActionExecutor ae = new MapReduceActionExecutor();
+        ae.check(context, context.getAction());
+
+        assertEquals(JavaActionExecutor.FAILED_KILLED, 
context.getAction().getExternalStatus());
+
+        ae.end(context, context.getAction());
+        assertEquals(WorkflowAction.Status.ERROR, 
context.getAction().getStatus());
+        assertTrue(context.getAction().getErrorMessage().contains("already 
exists"));
+    }
+
     private void _testSubmitWithCredentials(String name, String actionXml) 
throws Exception {
 
-        Context context = createContextWithCredentials("map-reduce", 
actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        Context context = createContextWithCredentials(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
+        Map<String, String> actionData = 
LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
-        assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+        assertTrue(LauncherHelper.hasIdSwap(actionData));
 
         MapReduceActionExecutor ae = new MapReduceActionExecutor();
         ae.check(context, context.getAction());
         assertTrue(launcherId.equals(context.getAction().getExternalId()));
 
-        JobConf conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
+        Configuration conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
         String user = conf.get("user.name");
-        String group = conf.get("group.name");
         JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
         final RunningJob mrJob = 
jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs()));
 
@@ -504,7 +476,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         assertTrue(mrJob.isSuccessful());
         ae.check(context, context.getAction());
 
-        assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+        assertEquals(JavaActionExecutor.SUCCEEDED, 
context.getAction().getExternalStatus());
         assertNull(context.getAction().getData());
 
         ae.end(context, context.getAction());
@@ -513,6 +485,12 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         assertTrue(MapperReducerCredentialsForTest.hasCredentials(mrJob));
     }
 
+    protected XConfiguration getSleepMapReduceConfig(String inputDir, String 
outputDir) {
+        XConfiguration conf = getMapReduceConfig(inputDir, outputDir);
+        conf.set("mapred.mapper.class", BlockingMapper.class.getName());
+        return conf;
+    }
+
     protected XConfiguration getMapReduceConfig(String inputDir, String 
outputDir) {
         XConfiguration conf = new XConfiguration();
         conf.set("mapred.mapper.class", MapperReducerForTest.class.getName());
@@ -555,7 +533,37 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         String actionXml = "<map-reduce>" + "<job-tracker>" + 
getJobTrackerUri() + "</job-tracker>" + "<name-node>"
                 + getNameNodeUri() + "</name-node>"
                 + getMapReduceConfig(inputDir.toString(), 
outputDir.toString()).toXmlString(false) + "</map-reduce>";
-        _testSubmit("map-reduce", actionXml);
+        _testSubmit(MAP_REDUCE, actionXml);
+    }
+
+    public void testMapReduceActionError() throws Exception {
+        FileSystem fs = getFileSystem();
+
+        Path inputDir = new Path(getFsTestCaseDir(), "input");
+        Path outputDir = new Path(getFsTestCaseDir(), "output1");
+
+        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, 
"data.txt")));
+        w.write("dummy\n");
+        w.write("dummy\n");
+        Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, 
"data.txt")));
+        ow.write("dummy\n");
+        ow.write("dummy\n");
+        ow.close();
+
+        String actionXml = "<map-reduce>" +
+                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+                "<name-node>" + getNameNodeUri() + "</name-node>" +
+                "<configuration>" +
+                "<property><name>mapred.mapper.class</name><value>" + 
MapperReducerForTest.class.getName() +
+                "</value></property>" +
+                "<property><name>mapred.reducer.class</name><value>" + 
MapperReducerForTest.class.getName() +
+                "</value></property>" +
+                "<property><name>mapred.input.dir</name><value>" + inputDir + 
"</value></property>" +
+                "<property><name>mapred.output.dir</name><value>" + outputDir 
+ "</value></property>" +
+                "</configuration>" +
+                "</map-reduce>";
+
+        _testSubmitError(actionXml, "already exists");
     }
 
     public void testMapReduceWithConfigClass() throws Exception {
@@ -569,7 +577,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         w.write("dummy\n");
         w.close();
 
-        Path jobXml = new Path(getFsTestCaseDir(), "job.xml");
+        Path jobXml = new Path(getFsTestCaseDir(), "action.xml");
         XConfiguration conf = getMapReduceConfig(inputDir.toString(), 
outputDir.toString());
         conf.set(MapperReducerForTest.JOB_XML_OUTPUT_LOCATION, 
jobXml.toUri().toString());
         conf.set("B", "b");
@@ -578,9 +586,10 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 + conf.toXmlString(false)
                 + "<config-class>" + 
OozieActionConfiguratorForTest.class.getName() + "</config-class>" + 
"</map-reduce>";
 
-        _testSubmit("map-reduce", actionXml);
+        _testSubmit(MAP_REDUCE, actionXml);
         Configuration conf2 = new Configuration(false);
         conf2.addResource(fs.open(jobXml));
+
         assertEquals("a", conf2.get("A"));
         assertEquals("c", conf2.get("B"));
     }
@@ -601,18 +610,11 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 + getMapReduceConfig(inputDir.toString(), 
outputDir.toString()).toXmlString(false)
                 + 
"<config-class>org.apache.oozie.does.not.exist</config-class>" + 
"</map-reduce>";
 
-        Context context = createContext("map-reduce", actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        waitFor(120 * 2000, new Predicate() {
-            @Override
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        assertFalse(LauncherMapperHelper.isMainSuccessful(launcherJob));
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
-        final Map<String, String> actionData = 
LauncherMapperHelper.getActionData(fs, context.getActionDir(),
+        final Map<String, String> actionData = 
LauncherHelper.getActionData(fs, context.getActionDir(),
                 context.getProtoActionConf());
         Properties errorProps = 
PropertiesUtils.stringToProperties(actionData.get(LauncherMapper.ACTION_DATA_ERROR_PROPS));
         assertEquals("An Exception occurred while instantiating the action 
config class",
@@ -638,24 +640,55 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 + conf.toXmlString(false)
                 + "<config-class>" + 
OozieActionConfiguratorForTest.class.getName() + "</config-class>" + 
"</map-reduce>";
 
-        Context context = createContext("map-reduce", actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        waitFor(120 * 2000, new Predicate() {
-            @Override
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        assertFalse(LauncherMapperHelper.isMainSuccessful(launcherJob));
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
-        final Map<String, String> actionData = 
LauncherMapperHelper.getActionData(fs, context.getActionDir(),
+        final Map<String, String> actionData = 
LauncherHelper.getActionData(fs, context.getActionDir(),
                 context.getProtoActionConf());
         Properties errorProps = 
PropertiesUtils.stringToProperties(actionData.get(LauncherMapper.ACTION_DATA_ERROR_PROPS));
         assertEquals("doh", errorProps.getProperty("exception.message"));
         
assertTrue(errorProps.getProperty("exception.stacktrace").startsWith(OozieActionConfiguratorException.class.getName()));
     }
 
+    public void testMapReduceActionKill() throws Exception {
+        FileSystem fs = getFileSystem();
+
+        Path inputDir = new Path(getFsTestCaseDir(), "input");
+        Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, 
"data.txt")));
+        w.write("dummy\n");
+        w.write("dummy\n");
+        w.close();
+
+        String actionXml = "<map-reduce>" + "<job-tracker>" + 
getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+                + getNameNodeUri() + "</name-node>"
+                + getSleepMapReduceConfig(inputDir.toString(), 
outputDir.toString()).toXmlString(false) + "</map-reduce>";
+
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        // wait until LauncherAM terminates - the MR job keeps running the 
background
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
+        MapReduceActionExecutor mae = new MapReduceActionExecutor();
+        mae.check(context, context.getAction());  // must be called so that 
externalChildIDs are read from HDFS
+        Configuration conf = mae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
+        String user = conf.get("user.name");
+        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
+        final RunningJob mrJob = 
jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs()));
+
+        mae.kill(context, context.getAction());
+
+        waitFor(10_000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return mrJob.isComplete();
+            }
+        });
+        assertEquals(JobStatus.State.KILLED, mrJob.getJobStatus().getState());
+    }
+
     public void testMapReduceWithCredentials() throws Exception {
         FileSystem fs = getFileSystem();
 
@@ -671,7 +704,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 + getNameNodeUri() + "</name-node>"
                 + getMapReduceCredentialsConfig(inputDir.toString(), 
outputDir.toString()).toXmlString(false)
                 + "</map-reduce>";
-        _testSubmitWithCredentials("map-reduce", actionXml);
+        _testSubmitWithCredentials(MAP_REDUCE, actionXml);
     }
 
     protected Path createAndUploadUberJar() throws Exception {
@@ -734,7 +767,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         String actionXml = "<map-reduce>" + "<job-tracker>" + 
getJobTrackerUri() + "</job-tracker>" + "<name-node>"
                 + getNameNodeUri() + "</name-node>"
                 + getMapReduceUberJarConfig(inputDir.toString(), 
outputDir.toString()).toXmlString(false) + "</map-reduce>";
-        String jobID = _testSubmit("map-reduce", actionXml);
+        String jobID = _testSubmit(MAP_REDUCE, actionXml);
 
         boolean containsLib1Jar = false;
         String lib1JarStr = "jobcache/" + jobID + "/jars/lib/lib1.jar";
@@ -914,7 +947,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                     + "#wordcount-simple" + "</program>" + "      </pipes>"
                     + getPipesConfig(inputDir.toString(), 
outputDir.toString()).toXmlString(false) + "<file>"
                     + programPath + "</file>" + "</map-reduce>";
-            _testSubmit("pipes", actionXml);
+            _testSubmit(PIPES, actionXml);
         }
         else {
             System.out.println(
@@ -948,22 +981,16 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 + 
getOozieActionExternalStatsWriteProperty(inputDir.toString(), 
outputDir.toString(), "true")
                         .toXmlString(false) + "</map-reduce>";
 
-        Context context = createContext("map-reduce", actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         MapReduceActionExecutor ae = new MapReduceActionExecutor();
-        JobConf conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
+        Configuration conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
 
-        Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        Map<String, String> actionData = 
LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-        assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+        assertTrue(LauncherHelper.hasIdSwap(actionData));
 
         ae.check(context, context.getAction());
         assertTrue(launcherId.equals(context.getAction().getExternalId()));
@@ -981,7 +1008,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         assertTrue(mrJob.isSuccessful());
         ae.check(context, context.getAction());
 
-        assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+        assertEquals(JavaActionExecutor.SUCCEEDED, 
context.getAction().getExternalStatus());
         assertNull(context.getAction().getData());
 
         ae.end(context, context.getAction());
@@ -1026,24 +1053,19 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 + 
getOozieActionExternalStatsWriteProperty(inputDir.toString(), 
outputDir.toString(), "false")
                         .toXmlString(false) + "</map-reduce>";
 
-        Context context = createContext("map-reduce", actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 2000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
+        Map<String, String> actionData = 
LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
-        assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+        assertTrue(LauncherHelper.hasIdSwap(actionData));
 
         MapReduceActionExecutor ae = new MapReduceActionExecutor();
         ae.check(context, context.getAction());
         assertTrue(launcherId.equals(context.getAction().getExternalId()));
 
-        JobConf conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
+        Configuration conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
         String user = conf.get("user.name");
         String group = conf.get("group.name");
         JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
@@ -1057,7 +1079,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         assertTrue(mrJob.isSuccessful());
         ae.check(context, context.getAction());
 
-        assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+        assertEquals(JavaActionExecutor.SUCCEEDED, 
context.getAction().getExternalStatus());
         assertNull(context.getAction().getData());
 
         ae.end(context, context.getAction());
@@ -1098,24 +1120,19 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 + 
getOozieActionExternalStatsWriteProperty(inputDir.toString(), 
outputDir.toString(), "false")
                 .toXmlString(false) + "</map-reduce>";
 
-        Context context = createContext("map-reduce", actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 2000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
+        Map<String, String> actionData = 
LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
-        assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+        assertTrue(LauncherHelper.hasIdSwap(actionData));
 
         MapReduceActionExecutor ae = new MapReduceActionExecutor();
         ae.check(context, context.getAction());
         assertTrue(launcherId.equals(context.getAction().getExternalId()));
 
-        JobConf conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
+        Configuration conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
         String user = conf.get("user.name");
         String group = conf.get("group.name");
         JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
@@ -1129,7 +1146,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         assertTrue(mrJob.isSuccessful());
         ae.check(context, context.getAction());
 
-        assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+        assertEquals(JavaActionExecutor.SUCCEEDED, 
context.getAction().getExternalStatus());
         assertNull(context.getAction().getData());
 
         actionXml = "<map-reduce>"
@@ -1185,35 +1202,24 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 .append(mrConfig.toXmlString(false)).append("</map-reduce>");
         String actionXml = sb.toString();
 
-        Context context = createContext("map-reduce", actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 2000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
-        assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        Map<String, String> actionData = 
LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
-        assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
-        // Assert launcher job name has been set
-        System.out.println("Launcher job name: " + launcherJob.getJobName());
-        assertTrue(launcherJob.getJobName().equals(launcherJobName));
+        assertTrue(LauncherHelper.hasIdSwap(actionData));
 
         MapReduceActionExecutor ae = new MapReduceActionExecutor();
         ae.check(context, context.getAction());
         assertTrue(launcherId.equals(context.getAction().getExternalId()));
 
-        JobConf conf = ae.createBaseHadoopConf(context,
-                XmlUtils.parseXml(actionXml));
+        Configuration conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
         String user = conf.get("user.name");
 
         JobClient jobClient = Services.get().get(HadoopAccessorService.class)
                 .createJobClient(user, conf);
-        final RunningJob mrJob = jobClient.getJob(JobID.forName(context
-                .getAction().getExternalChildIDs()));
+        final RunningJob mrJob = 
jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs()));
 
         waitFor(120 * 1000, new Predicate() {
             public boolean evaluate() throws Exception {
@@ -1223,7 +1229,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         assertTrue(mrJob.isSuccessful());
         ae.check(context, context.getAction());
 
-        assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+        assertEquals(JavaActionExecutor.SUCCEEDED, 
context.getAction().getExternalStatus());
         assertNull(context.getAction().getData());
 
         ae.end(context, context.getAction());
@@ -1304,7 +1310,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
 
         Element eActionXml = XmlUtils.parseXml(actionXml);
 
-        Context context = createContext("map-reduce", actionXml);
+        Context context = createContext(MAP_REDUCE, actionXml);
 
         Path appPath = getAppPath();
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java 
b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
index 9aa4cb6..4e8bb4b 100644
--- a/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
+++ b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
@@ -47,6 +47,8 @@ import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.WorkflowAppService;
 
+import com.google.common.base.Preconditions;
+
 public class OozieSharelibCLI {
     public static final String[] HELP_INFO = {
             "",
@@ -255,20 +257,27 @@ public class OozieSharelibCLI {
     private List<Future<Void>> copyFolderRecursively(final FileSystem fs, 
final ExecutorService threadPool,
             File srcFile, final Path dstPath) throws IOException {
         List<Future<Void>> taskList = new ArrayList<Future<Void>>();
-        for (final File file : srcFile.listFiles()) {
-            final Path trgName = new Path(dstPath, file.getName());
-            if (file.isDirectory()) {
-                taskList.addAll(copyFolderRecursively(fs, threadPool, file, 
trgName));
-            } else {
-                taskList.add(threadPool.submit(new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        fs.copyFromLocalFile(new Path(file.toURI()), trgName);
-                        return null;
-                    }
-                }));
+        File[] files = srcFile.listFiles();
+
+        if (files != null) {
+            for (final File file : files) {
+                final Path trgName = new Path(dstPath, file.getName());
+                if (file.isDirectory()) {
+                    taskList.addAll(copyFolderRecursively(fs, threadPool, 
file, trgName));
+                } else {
+                    taskList.add(threadPool.submit(new Callable<Void>() {
+                        @Override
+                        public Void call() throws Exception {
+                            fs.copyFromLocalFile(new Path(file.toURI()), 
trgName);
+                            return null;
+                        }
+                    }));
+                }
             }
+        } else {
+            System.out.println("WARNING: directory listing of " + 
srcFile.getAbsolutePath().toString() + " returned null");
         }
+
         return taskList;
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index e4fdfb7..4dc0c30 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -37,8 +37,8 @@
             <artifactId>oozie-client</artifactId>
             <exclusions>
                 <exclusion>
-                    <groupId>org.apache.oozie</groupId>
-                    <artifactId>oozie-hadoop-auth</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
                 </exclusion>
                 <exclusion>
                     <groupId>org.slf4j</groupId>
@@ -273,8 +273,8 @@
                     <scope>compile</scope>
                 </dependency>
                 <dependency>
-                    <groupId>org.apache.oozie</groupId>
-                    <artifactId>oozie-hadoop-auth</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
                     <scope>compile</scope>
                 </dependency>
                 <dependency>

Reply via email to