http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java new file mode 100644 index 0000000..1f7e5b2 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.oozie.QueryServlet; +import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult; +import org.apache.oozie.command.wf.HangServlet; +import org.apache.oozie.test.EmbeddedServletContainer; +import org.apache.oozie.test.XTestCase; +import org.junit.Assert; +import org.mockito.Mockito; + +import java.net.Proxy; +import java.util.HashMap; +import java.util.Map; + +// A lot of this adapted from org.apache.hadoop.mapreduce.v2.app.TestJobEndNotifier and org.apache.hadoop.mapred.TestJobEndNotifier +public class TestLauncherAMCallbackNotifier extends XTestCase { + private EmbeddedServletContainer container; + + @Override + public void setUp() throws Exception { + super.setUp(); + QueryServlet.lastQueryString = null; + } + + @Override + public void tearDown() throws Exception { + if (container != null) { + container.stop(); + } + + super.tearDown(); + } + + public void testConfiguration() throws Exception { + Configuration conf = new Configuration(false); + + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "0"); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "10"); + LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf); + assertEquals(0, cn.numTries); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1"); + cn = new LauncherAMCallbackNotifier(conf); + assertEquals(1, cn.numTries); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "20"); + cn = new LauncherAMCallbackNotifier(conf); + assertEquals(11, cn.numTries); //11 because number of _retries_ is 10 + + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "1000"); + cn = new LauncherAMCallbackNotifier(conf); + assertEquals(1000, cn.waitInterval); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "10000"); + cn = new LauncherAMCallbackNotifier(conf); + assertEquals(5000, cn.waitInterval); + //Test negative numbers are set to default + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "-10"); + cn = new LauncherAMCallbackNotifier(conf); + assertEquals(5000, cn.waitInterval); + + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_TIMEOUT, "1000"); + cn = new LauncherAMCallbackNotifier(conf); + assertEquals(1000, cn.timeout); + + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost"); + cn = new LauncherAMCallbackNotifier(conf); + assertEquals(Proxy.Type.DIRECT, cn.proxyToUse.type()); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost:someport"); + cn = new LauncherAMCallbackNotifier(conf); + assertEquals(Proxy.Type.DIRECT, cn.proxyToUse.type()); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost:1000"); + cn = new LauncherAMCallbackNotifier(conf); + assertEquals("HTTP @ somehost:1000", cn.proxyToUse.toString()); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "socks@somehost:1000"); + cn = new LauncherAMCallbackNotifier(conf); + assertEquals("SOCKS @ somehost:1000", cn.proxyToUse.toString()); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "SOCKS@somehost:1000"); + cn = new LauncherAMCallbackNotifier(conf); + assertEquals("SOCKS @ somehost:1000", cn.proxyToUse.toString()); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "sfafn@somehost:1000"); + cn = new LauncherAMCallbackNotifier(conf); + assertEquals("HTTP @ somehost:1000", cn.proxyToUse.toString()); + } + + public void testNotifyRetries() throws InterruptedException { + Configuration conf = new Configuration(false); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0"); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1"); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, "http://nonexistent"); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000"); + + LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf)); + + long start = System.currentTimeMillis(); + cnSpy.notifyURL(OozieActionResult.SUCCEEDED); + long end = System.currentTimeMillis(); + Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce(); + Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000); + + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "3"); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "3"); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "3000"); + + cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf)); + start = System.currentTimeMillis(); + cnSpy.notifyURL(OozieActionResult.SUCCEEDED); + end = System.currentTimeMillis(); + Mockito.verify(cnSpy, Mockito.times(3)).notifyURLOnce(); + Assert.assertTrue("Should have taken more than 9 seconds but it only took " + (end - start), end - start >= 9000); + } + + public void testNotifyTimeout() throws Exception { + Map<String, String> params = new HashMap<String, String>(); + params.put(HangServlet.SLEEP_TIME_MS, "1000000"); + Configuration conf = setupEmbeddedContainer(HangServlet.class, "/hang/*", "/hang/*", params); + + LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf)); + long start = System.currentTimeMillis(); + cnSpy.notifyURL(OozieActionResult.SUCCEEDED); + long end = System.currentTimeMillis(); + Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce(); + Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000); + } + + public void testNotify() throws Exception { + Configuration conf = setupEmbeddedContainer(QueryServlet.class, "/count/*", "/count/?status=$jobStatus", null); + + LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf); + + assertNull(QueryServlet.lastQueryString); + cn.notifyURL(OozieActionResult.SUCCEEDED); + waitForCallbackAndCheckResult(FinalApplicationStatus.SUCCEEDED.toString()); + } + + public void testNotifyBackgroundActionWhenSubmitSucceeds() throws Exception { + Configuration conf = setupEmbeddedContainer(QueryServlet.class, "/count/*", "/count/?status=$jobStatus", null); + + LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf); + + assertNull(QueryServlet.lastQueryString); + cn.notifyURL(OozieActionResult.RUNNING); + waitForCallbackAndCheckResult(OozieActionResult.RUNNING.toString()); + } + + public void testNotifyBackgroundActionWhenSubmitFailsWithFailed() throws Exception { + Configuration conf = setupEmbeddedContainer(QueryServlet.class, "/count/*", "/count/?status=$jobStatus", null); + + LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf); + + assertNull(QueryServlet.lastQueryString); + cn.notifyURL(OozieActionResult.FAILED); + waitForCallbackAndCheckResult(FinalApplicationStatus.FAILED.toString()); + } + + private Configuration setupEmbeddedContainer(Class<?> servletClass, String servletEndPoint, + String servletUrl, Map<String, String> params) throws Exception { + container = new EmbeddedServletContainer("test"); + if (servletEndPoint != null) { + if (params != null) { + container.addServletEndpoint(servletEndPoint, servletClass, params); + } else { + container.addServletEndpoint(servletEndPoint, servletClass); + } + } + container.start(); + + Configuration conf = new Configuration(false); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0"); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1"); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, container.getServletURL(servletUrl)); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000"); + + return conf; + } + + private void waitForCallbackAndCheckResult(final String expectedResult) { + waitFor(5000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return ("status=" + expectedResult).equals(QueryServlet.lastQueryString); + } + }); + + assertEquals("status=" + expectedResult, QueryServlet.lastQueryString); + } +}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java deleted file mode 100644 index 4cda615..0000000 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.action.hadoop; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapred.JobID; -import org.apache.oozie.WorkflowActionBean; -import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.service.HadoopAccessorService; -import org.apache.oozie.service.Services; -import org.apache.oozie.service.WorkflowAppService; -import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.util.XmlUtils; -import org.apache.oozie.util.IOUtils; -import org.jdom.Element; - -import java.io.File; -import java.io.OutputStream; -import java.io.InputStream; -import java.io.FileInputStream; -import java.io.StringReader; -import java.io.Writer; -import java.io.OutputStreamWriter; - -public class TestMapReduceActionError extends ActionExecutorTestCase { - - @Override - protected void setSystemProps() throws Exception { - super.setSystemProps(); - setSystemProperty("oozie.service.ActionService.executor.classes", MapReduceActionExecutor.class.getName()); - } - - private Context createContext(String actionXml) throws Exception { - JavaActionExecutor ae = new JavaActionExecutor(); - - Path appJarPath = new Path("lib/test.jar"); - File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", MapperReducerForTest.class); - InputStream is = new FileInputStream(jarFile); - OutputStream os = getFileSystem().create(new Path(getAppPath(), "lib/test.jar")); - IOUtils.copyStream(is, os); - - XConfiguration protoConf = new XConfiguration(); - protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); - - protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, appJarPath.toString()); - - WorkflowJobBean wf = createBaseWorkflow(protoConf, "mr-action"); - WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); - action.setType(ae.getType()); - action.setConf(actionXml); - - return new Context(wf, action); - } - - private RunningJob submitAction(Context context) throws Exception { - MapReduceActionExecutor ae = new MapReduceActionExecutor(); - - WorkflowAction action = context.getAction(); - - ae.prepareActionDir(getFileSystem(), context); - ae.submitLauncher(getFileSystem(), context, action); - - String jobId = action.getExternalId(); - String jobTracker = action.getTrackerUri(); - String consoleUrl = action.getConsoleUrl(); - assertNotNull(jobId); - assertNotNull(jobTracker); - assertNotNull(consoleUrl); - - Element e = XmlUtils.parseXml(action.getConf()); - XConfiguration conf = - new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration")).toString())); - conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker")); - conf.set("fs.default.name", e.getChildTextTrim("name-node")); - conf.set("user.name", context.getProtoActionConf().get("user.name")); - conf.set("group.name", getTestGroup()); - - conf.set("mapreduce.framework.name", "yarn"); - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - XConfiguration.copy(conf, jobConf); - String user = jobConf.get("user.name"); - String group = jobConf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; - } - - private void _testSubmit(String actionXml) throws Exception { - - Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(60 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - - MapReduceActionExecutor ae = new MapReduceActionExecutor(); - ae.check(context, context.getAction()); - - JobConf conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); - String user = conf.get("user.name"); - String group = conf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); - final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalId())); - - waitFor(60 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return mrJob.isComplete(); - } - }); - ae.check(context, context.getAction()); - - assertEquals("FAILED/KILLED", context.getAction().getExternalStatus()); - - ae.end(context, context.getAction()); - assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus()); - assertTrue(context.getAction().getErrorMessage().contains("already exists")); - } - - public void testMapReduce() throws Exception { - FileSystem fs = getFileSystem(); - - Path inputDir = new Path(getFsTestCaseDir(), "input"); - Path outputDir = new Path(getFsTestCaseDir(), "output1"); - - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, "data.txt"))); - ow.write("dummy\n"); - ow.write("dummy\n"); - ow.close(); - - String actionXml = "<map-reduce>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<configuration>" + - "<property><name>mapred.mapper.class</name><value>" + MapperReducerForTest.class.getName() + - "</value></property>" + - "<property><name>mapred.reducer.class</name><value>" + MapperReducerForTest.class.getName() + - "</value></property>" + - "<property><name>mapred.input.dir</name><value>" + inputDir + "</value></property>" + - "<property><name>mapred.output.dir</name><value>" + outputDir + "</value></property>" + - "</configuration>" + - "</map-reduce>"; - _testSubmit(actionXml); - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java index 5bc7d00..551adff 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java @@ -33,19 +33,12 @@ import java.util.regex.Matcher; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.BundleActionBean; import org.apache.oozie.BundleJobBean; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.action.hadoop.MapReduceActionExecutor; -import org.apache.oozie.action.hadoop.MapperReducerForTest; -import org.apache.oozie.action.hadoop.OozieJobInfo; import org.apache.oozie.client.Job; import org.apache.oozie.client.OozieClient; import org.apache.oozie.command.CommandException; @@ -53,19 +46,16 @@ import org.apache.oozie.command.bundle.BundleStartXCommand; import org.apache.oozie.command.bundle.BundleSubmitXCommand; import org.apache.oozie.command.wf.ActionXCommand; import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; -import org.apache.oozie.command.wf.JobXCommand; import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; +import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobsGetFromCoordParentIdJPAExecutor; -import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; -import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; import org.apache.oozie.service.UUIDService; -import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.UUIDService.ApplicationType; import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.util.IOUtils; @@ -163,14 +153,12 @@ public class TestOozieJobInfo extends XDataTestCase { ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfbean, actionList.get(1), false, false); MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor(); - JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(actionList.get(1).getConf())); + Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(actionList.get(1).getConf())); String user = conf.get("user.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); - String launcherId = actionList.get(1).getExternalId(); - final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId)); - FileSystem fs = context.getAppFileSystem(); - Configuration jobXmlConf = new XConfiguration(fs.open(new Path(launcherJob.getJobFile()))); + FileSystem fs = getFileSystem(); + Configuration jobXmlConf = new XConfiguration(fs.open(getPathToWorkflowResource( + user, wfbean, services, context, LauncherAM.LAUNCHER_JOB_CONF_XML))); String jobInfo = jobXmlConf.get(OozieJobInfo.JOB_INFO_KEY); // BUNDLE_ID;BUNDLE_NAME;COORDINATOR_NAME;COORDINATOR_NOMINAL_TIME; @@ -186,7 +174,6 @@ public class TestOozieJobInfo extends XDataTestCase { assertTrue(jobInfo.contains(",testing=test,")); assertTrue(jobInfo.contains(",coord.nominal.time=")); assertTrue(jobInfo.contains("launcher=true")); - } protected void setCoordConf(Configuration jobConf) throws IOException { http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java index df9e939..89aeab6 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java @@ -24,6 +24,9 @@ import org.apache.oozie.service.Services; import org.apache.oozie.test.XFsTestCase; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapred.JobConf; +import org.xml.sax.SAXException; + +import javax.xml.parsers.ParserConfigurationException; public class TestPrepareActionsDriver extends XFsTestCase { @@ -40,7 +43,7 @@ public class TestPrepareActionsDriver extends XFsTestCase { } // Test to check if prepare action is performed as expected when the prepare XML block is a valid one - public void testDoOperationsWithValidXML() throws LauncherException, IOException { + public void testDoOperationsWithValidXML() throws LauncherException, IOException, ParserConfigurationException, SAXException { Path actionDir = getFsTestCaseDir(); FileSystem fs = getFileSystem(); Path newDir = new Path(actionDir, "newDir"); @@ -52,12 +55,12 @@ public class TestPrepareActionsDriver extends XFsTestCase { } JobConf conf = createJobConf(); - LauncherMapperHelper.setupLauncherURIHandlerConf(conf); + LauncherHelper.setupLauncherURIHandlerConf(conf); PrepareActionsDriver.doOperations(prepareXML, conf); assertTrue(fs.exists(actionDir)); } - // Test to check if LauncherException is thrown when the prepare XML block is invalid + // Test to check if Exception is thrown when the prepare XML block is invalid public void testDoOperationsWithInvalidXML() throws LauncherException, IOException { Path actionDir = getFsTestCaseDir(); FileSystem fs = getFileSystem(); @@ -72,14 +75,12 @@ public class TestPrepareActionsDriver extends XFsTestCase { try { prepareXML = "prepare>" + "<mkdir path='" + newDir + "'/>" + "</prepare>"; JobConf conf = createJobConf(); - LauncherMapperHelper.setupLauncherURIHandlerConf(conf); + LauncherHelper.setupLauncherURIHandlerConf(conf); PrepareActionsDriver.doOperations(prepareXML, conf); fail("Expected to catch an exception but did not encounter any"); - } catch (LauncherException le) { - assertEquals(le.getCause().getClass(), org.xml.sax.SAXParseException.class); - assertEquals(le.getMessage(), "Content is not allowed in prolog."); - } catch(Exception ex){ - fail("Expected a LauncherException but received an Exception"); + } catch (Exception ex) { + assertEquals(ex.getClass(), org.xml.sax.SAXParseException.class); + assertEquals(ex.getMessage(), "Content is not allowed in prolog."); } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java index f12927b..72be0a2 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java @@ -26,16 +26,10 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.util.Shell; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.service.ActionService; -import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.Services; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.util.PropertiesUtils; @@ -294,14 +288,8 @@ public class TestShellActionExecutor extends ActionExecutorTestCase { Context context = createContext(actionXml); // Submit the action - final RunningJob launcherJob = submitAction(context); - waitFor(180 * 1000, new Predicate() { // Wait for the external job to - // finish - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); ShellActionExecutor ae = new ShellActionExecutor(); WorkflowAction action = context.getAction(); ae.check(context, action); @@ -323,24 +311,14 @@ public class TestShellActionExecutor extends ActionExecutorTestCase { private WorkflowAction _testSubmit(String actionXml, boolean checkForSuccess, String capture_output) throws Exception { Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context);// Submit the - // action - String launcherId = context.getAction().getExternalId(); // Get LM id - waitFor(180 * 1000, new Predicate() { // Wait for the external job to - // finish - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - // Thread.sleep(2000); - assertTrue(launcherJob.isSuccessful()); - - sleep(2000);// Wait more to make sure no ID swap happens + final String launcherId = submitAction(context);// Submit the action + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); ShellActionExecutor ae = new ShellActionExecutor(); ae.check(context, context.getAction()); @@ -399,14 +377,13 @@ public class TestShellActionExecutor extends ActionExecutorTestCase { * @return The RunningJob of the Launcher Mapper * @throws Exception */ - private RunningJob submitAction(Context context) throws Exception { + private String submitAction(Context context) throws Exception { ShellActionExecutor ae = new ShellActionExecutor(); WorkflowAction action = context.getAction(); ae.prepareActionDir(getFileSystem(), context); - ae.submitLauncher(getFileSystem(), context, action); // Submit the - // Launcher Mapper + ae.submitLauncher(getFileSystem(), context, action); // Submit the action String jobId = action.getExternalId(); String jobTracker = action.getTrackerUri(); @@ -416,41 +393,6 @@ public class TestShellActionExecutor extends ActionExecutorTestCase { assertNotNull(jobTracker); assertNotNull(consoleUrl); - Element e = XmlUtils.parseXml(action.getConf()); - XConfiguration conf = new XConfiguration(); - conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker")); - conf.set("fs.default.name", e.getChildTextTrim("name-node")); - conf.set("user.name", context.getProtoActionConf().get("user.name")); - conf.set("group.name", getTestGroup()); - - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - XConfiguration.copy(conf, jobConf); - String user = jobConf.get("user.name"); - String group = jobConf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; - } - - public void testShellMainPathInUber() throws Exception { - Services.get().getConf().setBoolean("oozie.action.shell.launcher.mapreduce.job.ubertask.enable", true); - - Element actionXml = XmlUtils.parseXml("<shell>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" - + "<name-node>" + getNameNodeUri() + "</name-node>" + "<exec>script.sh</exec>" - + "<argument>a=A</argument>" + "<argument>b=B</argument>" + "</shell>"); - ShellActionExecutor ae = new ShellActionExecutor(); - XConfiguration protoConf = new XConfiguration(); - protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); - - WorkflowJobBean wf = createBaseWorkflow(protoConf, "action"); - WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); - action.setType(ae.getType()); - - Context context = new Context(wf, action); - JobConf launcherConf = new JobConf(); - launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, launcherConf); - // env - assertEquals("PATH=.:$PATH", launcherConf.get(JavaActionExecutor.YARN_AM_ENV)); + return jobId; } } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java index e757e54..a7d6c18 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java @@ -25,7 +25,6 @@ import java.io.FileWriter; import java.io.Writer; import java.util.Properties; -import org.apache.hadoop.fs.Path; import org.apache.oozie.util.XConfiguration; //Test cases are mainly implemented in the Base class @@ -53,8 +52,8 @@ public class TestShellMain extends ShellTestCase { jobConf.set(ShellMain.CONF_OOZIE_SHELL_EXEC, SHELL_COMMAND_NAME); String[] args = new String[] { SHELL_COMMAND_SCRIPTFILE_OPTION, script.toString(), "A", "B" }; - MapReduceMain.setStrings(jobConf, ShellMain.CONF_OOZIE_SHELL_ARGS, args); - MapReduceMain.setStrings(jobConf, ShellMain.CONF_OOZIE_SHELL_ENVS, + ActionUtils.setStrings(jobConf, ShellMain.CONF_OOZIE_SHELL_ARGS, args); + ActionUtils.setStrings(jobConf, ShellMain.CONF_OOZIE_SHELL_ENVS, new String[] { "var1=value1", "var2=value2" }); File actionXml = new File(getTestCaseDir(), "action.xml"); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java index dbc160f..9c7064b 100644 --- a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java +++ b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java @@ -135,8 +135,8 @@ public class TestOozieCLI extends DagServletTestCase { String path = getTestCaseDir() + "/" + getName() + ".properties"; Properties props = new Properties(); props.setProperty(OozieClient.USER_NAME, getTestUser()); - props.setProperty(XOozieClient.NN, "localhost:9000"); - props.setProperty(XOozieClient.JT, "localhost:9001"); + props.setProperty(XOozieClient.NN, "localhost:8020"); + props.setProperty(XOozieClient.RM, "localhost:8032"); props.setProperty("oozie.libpath", appPath); props.setProperty("mapred.output.dir", appPath); props.setProperty("a", "A"); @@ -155,7 +155,7 @@ public class TestOozieCLI extends DagServletTestCase { props.setProperty(OozieClient.APP_PATH, appPath); props.setProperty(OozieClient.RERUN_SKIP_NODES, "node"); props.setProperty(XOozieClient.NN, "localhost:9000"); - props.setProperty(XOozieClient.JT, "localhost:9001"); + props.setProperty(XOozieClient.RM, "localhost:9001"); if (useNewAPI) { props.setProperty("mapreduce.map.class", "mapper.class"); props.setProperty("mapreduce.reduce.class", "reducer.class"); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java b/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java index 51ae9e8..b4bce60 100644 --- a/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java +++ b/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java @@ -29,8 +29,6 @@ import org.apache.oozie.servlet.MockDagEngineService; import org.apache.oozie.servlet.V1JobsServlet; import org.apache.oozie.servlet.V1AdminServlet; -import java.io.File; - public class TestWorkflowXClient extends DagServletTestCase { static { @@ -60,7 +58,7 @@ public class TestWorkflowXClient extends DagServletTestCase { Path libPath = new Path(getFsTestCaseDir(), "lib"); getFileSystem().mkdirs(libPath); conf.setProperty(OozieClient.LIBPATH, libPath.toString()); - conf.setProperty(XOozieClient.JT, "localhost:9001"); + conf.setProperty(XOozieClient.RM, "localhost:9001"); conf.setProperty(XOozieClient.NN, "hdfs://localhost:9000"); String[] params = new String[]{"INPUT=input.txt"}; @@ -90,7 +88,7 @@ public class TestWorkflowXClient extends DagServletTestCase { getFileSystem().mkdirs(libPath); System.out.println(libPath.toString()); conf.setProperty(OozieClient.LIBPATH, libPath.toString()); - conf.setProperty(XOozieClient.JT, "localhost:9001"); + conf.setProperty(XOozieClient.RM, "localhost:9001"); conf.setProperty(XOozieClient.NN, "hdfs://localhost:9000"); String[] params = new String[]{"NAME=test"}; @@ -120,7 +118,7 @@ public class TestWorkflowXClient extends DagServletTestCase { getFileSystem().mkdirs(libPath); System.out.println(libPath.toString()); conf.setProperty(OozieClient.LIBPATH, libPath.toString()); - conf.setProperty(XOozieClient.JT, "localhost:9001"); + conf.setProperty(XOozieClient.RM, "localhost:9001"); conf.setProperty(XOozieClient.NN, "hdfs://localhost:9000"); assertEquals(MockDagEngineService.JOB_ID + wfCount + MockDagEngineService.JOB_ID_END, @@ -154,9 +152,9 @@ public class TestWorkflowXClient extends DagServletTestCase { fail("submit client without JT should throw exception"); } catch (RuntimeException exception) { - assertEquals("java.lang.RuntimeException: jobtracker is not specified in conf", exception.toString()); + assertEquals("java.lang.RuntimeException: Resource manager is not specified in conf", exception.toString()); } - conf.setProperty(XOozieClient.JT, "localhost:9001"); + conf.setProperty(XOozieClient.RM, "localhost:9001"); try { wc.submitMapReduce(conf); fail("submit client without NN should throw exception"); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java index c071000..b8eb15d 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java @@ -19,8 +19,10 @@ package org.apache.oozie.command.coord; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Set; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorJobBean; @@ -84,6 +86,11 @@ public class TestCoordChangeXCommand extends XDataTestCase { @Override public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay){return false;} + + @Override + public Set<String> getInterruptTypes() { + return Collections.emptySet(); + } } @Override http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java b/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java index 3344cf9..c1bca16 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java +++ b/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java @@ -18,6 +18,8 @@ package org.apache.oozie.command.wf; +import org.apache.oozie.util.XLog; + import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; @@ -25,14 +27,27 @@ import javax.servlet.http.HttpServletResponse; import java.io.IOException; /** - * Servlet that 'hangs' for 200 ms. Used by TestNotificationXCommand + * Servlet that 'hangs' for some amount of time (200ms) by default. + * The time can be configured by setting {@link HangServlet#SLEEP_TIME_MS} as an init parameter for the servlet. */ public class HangServlet extends HttpServlet { + public static final String SLEEP_TIME_MS = "sleep_time_ms"; + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { try { - Thread.sleep(200); + long time = 200; + String sleeptime = getInitParameter(SLEEP_TIME_MS); + if (sleeptime != null) { + try { + time = Long.parseLong(sleeptime); + } catch (NumberFormatException nfe) { + XLog.getLog(HangServlet.class).error("Invalid sleep time, using default (200)", nfe); + } + } + XLog.getLog(HangServlet.class).info("Sleeping for {0} ms", time); + Thread.sleep(time); } catch (Exception ex) { //NOP http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java index 5898d1a..a5128a8 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java @@ -23,18 +23,17 @@ import java.io.Writer; import java.util.Date; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; -import org.apache.oozie.ForTestingActionExecutor; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; -import org.apache.oozie.action.hadoop.LauncherMapperHelper; +import org.apache.oozie.action.hadoop.LauncherHelper; import org.apache.oozie.action.hadoop.MapReduceActionExecutor; import org.apache.oozie.action.hadoop.MapperReducerForTest; import org.apache.oozie.client.WorkflowAction; @@ -45,7 +44,6 @@ import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; -import org.apache.oozie.service.ActionCheckerService; import org.apache.oozie.service.ActionService; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HadoopAccessorService; @@ -259,30 +257,25 @@ public class TestActionCheckXCommand extends XDataTestCase { ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false); MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor(); - JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf())); + Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf())); String user = conf.get("user.name"); JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); String launcherId = action.getExternalId(); - final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId)); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); new ActionCheckXCommand(action.getId()).call(); action = jpaService.execute(wfActionGetCmd); - String mapperId = action.getExternalId(); + String externalId = action.getExternalId(); String childId = action.getExternalChildIDs(); - assertTrue(launcherId.equals(mapperId)); + assertEquals("LauncherId", launcherId, externalId); + assertNotNull(childId); final RunningJob mrJob = jobClient.getJob(JobID.forName(childId)); @@ -297,7 +290,6 @@ public class TestActionCheckXCommand extends XDataTestCase { action = jpaService.execute(wfActionGetCmd); assertEquals("SUCCEEDED", action.getExternalStatus()); - } private static class ErrorCheckActionExecutor extends ActionExecutor { @@ -416,7 +408,7 @@ public class TestActionCheckXCommand extends XDataTestCase { ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job2, action1, false, false); WorkflowActionBean action2 = jpaService.execute(wfActionGetCmd); MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor(); - JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action2.getConf())); + Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action2.getConf())); String user = conf.get("user.name"); JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); @@ -434,9 +426,9 @@ public class TestActionCheckXCommand extends XDataTestCase { } }); assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); new ActionCheckXCommand(actionId).call(); WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd); @@ -488,7 +480,7 @@ public class TestActionCheckXCommand extends XDataTestCase { ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job0, action1, false, false); MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor(); - JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf())); + Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf())); String user = conf.get("user.name"); JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); @@ -501,9 +493,9 @@ public class TestActionCheckXCommand extends XDataTestCase { } }); assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); new ActionCheckXCommand(action1.getId()).call(); WorkflowActionBean action2 = jpaService.execute(wfActionGetCmd); @@ -568,9 +560,9 @@ public class TestActionCheckXCommand extends XDataTestCase { }); assertTrue(launcherJob2.isSuccessful()); - actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); new ActionCheckXCommand(actionId).call(); WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java index ea90c08..80c5d54 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java @@ -28,13 +28,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.action.hadoop.LauncherMapperHelper; +import org.apache.oozie.action.hadoop.LauncherHelper; import org.apache.oozie.action.hadoop.MapReduceActionExecutor; import org.apache.oozie.action.hadoop.MapperReducerForTest; import org.apache.oozie.client.OozieClient; @@ -50,7 +46,6 @@ import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQ import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; -import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.InstrumentationService; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.LiteWorkflowStoreService; @@ -162,23 +157,14 @@ public class TestActionStartXCommand extends XDataTestCase { ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false); MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor(); - JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf())); - String user = conf.get("user.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); + Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf())); String launcherId = action.getExternalId(); - final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId)); - - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); } public void testActionStartToCheckRetry() throws Exception { @@ -238,26 +224,15 @@ public class TestActionStartXCommand extends XDataTestCase { ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false); MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor(); - JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf())); + Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf())); String user = conf.get("user.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); String launcherId = action.getExternalId(); - // retrieve launcher job - final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId)); - - // time out after 120 seconds unless launcher job succeeds - waitFor(240 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - // check if launcher job succeeds - assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); } /** http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java index 43edf5e..98c94a7 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java @@ -21,7 +21,7 @@ package org.apache.oozie.command.wf; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.client.OozieClient; import org.apache.oozie.local.LocalOozie; -import org.apache.oozie.action.hadoop.MapReduceMain; +import org.apache.oozie.action.hadoop.ActionUtils; import org.apache.oozie.client.XOozieClient; import org.apache.oozie.test.XFsTestCase; import org.apache.oozie.util.XLog; @@ -45,7 +45,7 @@ public class TestSubmitHiveXCommand extends XFsTestCase { public void testWFXmlGeneration() throws Exception { Configuration conf = new Configuration(); - conf.set(XOozieClient.JT, "jobtracker"); + conf.set(XOozieClient.RM, "jobtracker"); conf.set(XOozieClient.NN, "namenode"); conf.set(OozieClient.LIBPATH, "libpath"); @@ -54,9 +54,9 @@ public class TestSubmitHiveXCommand extends XFsTestCase { String hiveArgsStr = "-a aaa -b bbb -c ccc -M -Da=aaa -Db=bbb -param input=abc"; String[] args = hiveArgsStr.split(" "); - MapReduceMain.setStrings(conf, XOozieClient.HIVE_OPTIONS, args); + ActionUtils.setStrings(conf, XOozieClient.HIVE_OPTIONS, args); String[] params = new String[]{"INPUT=/some/path", "OUTPUT=/some/other/path", "abc=xyz"}; - MapReduceMain.setStrings(conf, XOozieClient.HIVE_SCRIPT_PARAMS, params); + ActionUtils.setStrings(conf, XOozieClient.HIVE_SCRIPT_PARAMS, params); SubmitHiveXCommand submitHiveCmd = new SubmitHiveXCommand(conf); String xml = submitHiveCmd.getWorkflowXml(conf); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java index 5bc5747..388ff94 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java @@ -51,7 +51,7 @@ public class TestSubmitMRXCommand extends XFsTestCase { public void testWFXmlGeneration() throws Exception { Configuration conf = new Configuration(false); - conf.set(XOozieClient.JT, "jobtracker"); + conf.set(XOozieClient.RM, "jobtracker"); conf.set(XOozieClient.NN, "namenode"); conf.set(OozieClient.LIBPATH, "libpath"); @@ -97,7 +97,7 @@ public class TestSubmitMRXCommand extends XFsTestCase { public void testWFXmlGenerationNegative1() throws Exception { Configuration conf = new Configuration(); - conf.set(XOozieClient.JT, "jobtracker"); + conf.set(XOozieClient.RM, "jobtracker"); conf.set(XOozieClient.NN, "namenode"); // conf.set(XOozieClient.LIBPATH, "libpath"); @@ -118,8 +118,8 @@ public class TestSubmitMRXCommand extends XFsTestCase { public void testWFXmlGenerationNewConfigProps() throws Exception { try { Configuration conf = new Configuration(false); - conf.set(XOozieClient.NN_2, "new_NN"); - conf.set(XOozieClient.JT_2, "new_JT"); + conf.set(XOozieClient.NN, "new_NN"); + conf.set(XOozieClient.RM, "new_JT"); conf.set("mapred.mapper.class", "TestMapper"); conf.set("mapred.reducer.class", "TestReducer"); conf.set("mapred.input.dir", "testInput"); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java index 5a1de25..c3cd1aa 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java @@ -21,7 +21,7 @@ package org.apache.oozie.command.wf; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.client.OozieClient; import org.apache.oozie.local.LocalOozie; -import org.apache.oozie.action.hadoop.MapReduceMain; +import org.apache.oozie.action.hadoop.ActionUtils; import org.apache.oozie.client.XOozieClient; import org.apache.oozie.test.XFsTestCase; import org.apache.oozie.util.XLog; @@ -46,7 +46,7 @@ public class TestSubmitPigXCommand extends XFsTestCase { public void testWFXmlGeneration1() throws Exception { Configuration conf = new Configuration(); - conf.set(XOozieClient.JT, "jobtracker"); + conf.set(XOozieClient.RM, "jobtracker"); conf.set(XOozieClient.NN, "namenode"); conf.set(OozieClient.LIBPATH, "libpath"); @@ -55,9 +55,9 @@ public class TestSubmitPigXCommand extends XFsTestCase { String pigArgsStr = "-a aaa -b bbb -c ccc -M -Da=aaa -Db=bbb -param input=abc"; String[] args = pigArgsStr.split(" "); - MapReduceMain.setStrings(conf, XOozieClient.PIG_OPTIONS, args); + ActionUtils.setStrings(conf, XOozieClient.PIG_OPTIONS, args); String[] params = new String[]{"INPUT=/some/path", "OUTPUT=/some/other/path", "abc=xyz"}; - MapReduceMain.setStrings(conf, XOozieClient.PIG_SCRIPT_PARAMS, params); + ActionUtils.setStrings(conf, XOozieClient.PIG_SCRIPT_PARAMS, params); SubmitPigXCommand submitPigCmd = new SubmitPigXCommand(conf); String xml = submitPigCmd.getWorkflowXml(conf); @@ -118,7 +118,7 @@ public class TestSubmitPigXCommand extends XFsTestCase { public void testWFXmlGeneration2() throws Exception { Configuration conf = new Configuration(); - conf.set(XOozieClient.JT, "jobtracker"); + conf.set(XOozieClient.RM, "jobtracker"); conf.set(XOozieClient.NN, "namenode"); conf.set(OozieClient.LIBPATH, "libpath"); @@ -128,7 +128,7 @@ public class TestSubmitPigXCommand extends XFsTestCase { String[] args = new String[2]; args[0] = "-a"; args[1] = "aaa bbb"; - MapReduceMain.setStrings(conf, XOozieClient.PIG_OPTIONS, args); + ActionUtils.setStrings(conf, XOozieClient.PIG_OPTIONS, args); SubmitPigXCommand submitPigCmd = new SubmitPigXCommand(conf); String xml = submitPigCmd.getWorkflowXml(conf); @@ -169,7 +169,7 @@ public class TestSubmitPigXCommand extends XFsTestCase { public void testWFXmlGenerationNegative1() throws Exception { Configuration conf = new Configuration(); - conf.set(XOozieClient.JT, "jobtracker"); + conf.set(XOozieClient.RM, "jobtracker"); conf.set(XOozieClient.NN, "namenode"); // conf.set(XOozieClient.LIBPATH, "libpath"); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java index 49b5028..f2f248a 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java @@ -21,7 +21,7 @@ package org.apache.oozie.command.wf; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.client.OozieClient; import org.apache.oozie.local.LocalOozie; -import org.apache.oozie.action.hadoop.MapReduceMain; +import org.apache.oozie.action.hadoop.ActionUtils; import org.apache.oozie.client.XOozieClient; import org.apache.oozie.test.XFsTestCase; import org.apache.oozie.util.XLog; @@ -46,7 +46,7 @@ public class TestSubmitSqoopXCommand extends XFsTestCase { public void testWFXmlGeneration() throws Exception { Configuration conf = new Configuration(); - conf.set(XOozieClient.JT, "jobtracker"); + conf.set(XOozieClient.RM, "jobtracker"); conf.set(XOozieClient.NN, "namenode"); conf.set(OozieClient.LIBPATH, "libpath"); @@ -54,7 +54,7 @@ public class TestSubmitSqoopXCommand extends XFsTestCase { String sqoopArgsStr = "-Da=aaa -Db=bbb"; String[] args = sqoopArgsStr.split(" "); - MapReduceMain.setStrings(conf, XOozieClient.SQOOP_OPTIONS, args); + ActionUtils.setStrings(conf, XOozieClient.SQOOP_OPTIONS, args); SubmitSqoopXCommand submitSqoopCmd = new SubmitSqoopXCommand(conf); String xml = submitSqoopCmd.getWorkflowXml(conf); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java index 72f0114..ef75f14 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java @@ -18,19 +18,18 @@ package org.apache.oozie.command.wf; -import java.io.StringReader; import java.net.URI; import java.util.Date; +import java.util.Set; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.JobStatus; -import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.action.hadoop.LauncherMain; import org.apache.oozie.action.hadoop.MapperReducerForTest; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; @@ -42,11 +41,10 @@ import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; import org.apache.oozie.service.UUIDService; import org.apache.oozie.test.XDataTestCase; -import org.apache.oozie.test.XTestCase.Predicate; -import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.util.XmlUtils; import org.apache.oozie.workflow.WorkflowInstance; +import com.google.common.collect.Sets; + public class TestWorkflowActionKillXCommand extends XDataTestCase { private Services services; @@ -117,26 +115,6 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase { assertEquals(action.getExternalStatus(), "RUNNING"); } - public void testWfActionKillChildJob() throws Exception { - String externalJobID = launchSleepJob(1000); - String childId = launchSleepJob(1000000); - - WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED); - WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), externalJobID, "1", - WorkflowAction.Status.KILLED, childId); - - new ActionKillXCommand(action.getId()).call(); - JobClient jobClient = createJobClient(); - - final RunningJob mrJob = jobClient.getJob(JobID.forName(childId)); - waitFor(60 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return mrJob.isComplete(); - } - }); - assertEquals(mrJob.getJobState(), JobStatus.KILLED); - } - protected WorkflowActionBean addRecordToWfActionTable(String wfId, String externalJobID, String actionName, WorkflowAction.Status status, String childID) throws Exception { WorkflowActionBean action = new WorkflowActionBean(); @@ -189,9 +167,18 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase { SleepJob sleepjob = new SleepJob(); sleepjob.setConf(jobConf); jobConf = sleepjob.setupJobConf(1, 1, sleep, 1, sleep, 1); + jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, "sleepjob"); + jobConf.set(LauncherMain.MAPREDUCE_JOB_TAGS, "sleepjob"); + System.setProperty(LauncherMain.OOZIE_JOB_LAUNCH_TIME, String.valueOf(System.currentTimeMillis())); + + jobClient.submitJob(jobConf); + Set<ApplicationId> apps = Sets.newHashSet(); + apps = LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL); + assertEquals("Number of YARN apps", apps.size(), 1); + + sleepjob.close(); - final RunningJob runningJob = jobClient.submitJob(jobConf); - return runningJob.getID().toString(); + return apps.iterator().next().toString(); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java index 3c6525d..5957ad6 100644 --- a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java +++ b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java @@ -19,7 +19,7 @@ package org.apache.oozie.service; import org.apache.hadoop.conf.Configuration; -import org.apache.oozie.action.hadoop.CredentialsProvider; +import org.apache.oozie.action.hadoop.CredentialsProviderFactory; import org.apache.oozie.action.hadoop.DistcpActionExecutor; import org.apache.oozie.action.hadoop.JavaActionExecutor; import org.apache.oozie.action.hadoop.LauncherMapper; @@ -210,13 +210,10 @@ public class TestConfigurationService extends XTestCase { assertEquals(2048, ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA)); assertEquals("http://0.0.0.0:11000/oozie?job=", ConfigurationService.get(JobXCommand.CONF_CONSOLE_URL)); - assertEquals(true, ConfigurationService.getBoolean(JavaActionExecutor.CONF_HADOOP_YARN_UBER_MODE)); - assertEquals(false, ConfigurationService.getBoolean( - "oozie.action.shell.launcher." + JavaActionExecutor.HADOOP_YARN_UBER_MODE)); assertEquals(false, ConfigurationService.getBoolean(HadoopAccessorService.KERBEROS_AUTH_ENABLED)); assertEquals(0, ConfigurationService.getStrings("no.defined").length); - assertEquals(0, ConfigurationService.getStrings(CredentialsProvider.CRED_KEY).length); + assertEquals(0, ConfigurationService.getStrings(CredentialsProviderFactory.CRED_KEY).length); assertEquals(1, ConfigurationService.getStrings(DistcpActionExecutor.CLASS_NAMES).length); assertEquals("distcp=org.apache.hadoop.tools.DistCp", ConfigurationService.getStrings(DistcpActionExecutor.CLASS_NAMES)[0]); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java index bbca479..a1ee004 100644 --- a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java +++ b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java @@ -18,7 +18,15 @@ package org.apache.oozie.service; -import org.apache.oozie.test.XTestCase; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.oozie.test.XFsTestCase; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.fs.FileSystem; @@ -34,7 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.oozie.ErrorCode; import org.apache.oozie.util.XConfiguration; -public class TestHadoopAccessorService extends XTestCase { +public class TestHadoopAccessorService extends XFsTestCase { protected void setUp() throws Exception { super.setUp(); @@ -140,49 +148,83 @@ public class TestHadoopAccessorService extends XTestCase { */ assertEquals("100", conf.get("action.testprop")); assertEquals("1", conf.get("default.testprop")); - - // Check that properties load correctly assertEquals("org.apache.log4j.ConsoleAppender", conf.get("log4j.appender.oozie")); assertEquals("NONE, null", conf.get("log4j.logger.a")); + } + + public void testCreateJobClient() throws Exception { + HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); + JobConf conf = has.createJobConf(getJobTrackerUri()); + + JobClient jc = has.createJobClient(getTestUser(), conf); + assertNotNull(jc); + jc.getAllJobs(); + JobConf conf2 = new JobConf(false); + conf2.set("mapred.job.tracker", getJobTrackerUri()); + try { + has.createJobClient(getTestUser(), conf2); + fail("Should have thrown exception because Configuration not created by HadoopAccessorService"); + } + catch (HadoopAccessorException ex) { + assertEquals(ErrorCode.E0903, ex.getErrorCode()); + } } - public void testAccessor() throws Exception { - Services services = Services.get(); - HadoopAccessorService has = services.get(HadoopAccessorService.class); + public void testCreateYarnClient() throws Exception { + HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); JobConf conf = has.createJobConf(getJobTrackerUri()); - conf.set("mapred.job.tracker", getJobTrackerUri()); - conf.set("fs.default.name", getNameNodeUri()); - URI uri = new URI(getNameNodeUri()); + YarnClient yc = has.createYarnClient(getTestUser(), conf); + assertNotNull(yc); + yc.getApplications(); - //valid user - String user = getTestUser(); - String group = getTestGroup(); + try { + yc = has.createYarnClient("invalid-user", conf); + assertNotNull(yc); + yc.getApplications(); + fail("Should have thrown exception because not allowed to impersonate 'invalid-user'"); + } + catch (AuthorizationException ex) { + } - JobClient jc = has.createJobClient(user, conf); - assertNotNull(jc); - FileSystem fs = has.createFileSystem(user, new URI(getNameNodeUri()), conf); - assertNotNull(fs); - fs = has.createFileSystem(user, uri, conf); - assertNotNull(fs); + JobConf conf2 = new JobConf(false); + conf2.set("yarn.resourcemanager.address", getJobTrackerUri()); + try { + has.createYarnClient(getTestUser(), conf2); + fail("Should have thrown exception because Configuration not created by HadoopAccessorService"); + } + catch (HadoopAccessorException ex) { + assertEquals(ErrorCode.E0903, ex.getErrorCode()); + } + } - //invalid user + public void testCreateFileSystem() throws Exception { + HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); + JobConf conf = has.createJobConf(getJobTrackerUri()); - user = "invalid"; + FileSystem fs = has.createFileSystem(getTestUser(), new URI(getNameNodeUri()), conf); + assertNotNull(fs); + fs.exists(new Path(getNameNodeUri(), "/foo")); try { - has.createJobClient(user, conf); - fail(); + fs = has.createFileSystem("invalid-user", new URI(getNameNodeUri()), conf); + assertNotNull(fs); + fs.exists(new Path(getNameNodeUri(), "/foo")); + fail("Should have thrown exception because not allowed to impersonate 'invalid-user'"); } - catch (Throwable ex) { + catch (RemoteException ex) { + assertEquals(AuthorizationException.class.getName(), ex.getClassName()); } + JobConf conf2 = new JobConf(false); + conf2.set("fs.default.name", getNameNodeUri()); try { - has.createFileSystem(user, uri, conf); - fail(); + has.createFileSystem(getTestUser(), new URI(getNameNodeUri()), conf2); + fail("Should have thrown exception because Configuration not created by HadoopAccessorService"); } - catch (Throwable ex) { + catch (HadoopAccessorException ex) { + assertEquals(ErrorCode.E0903, ex.getErrorCode()); } } @@ -190,7 +232,7 @@ public class TestHadoopAccessorService extends XTestCase { HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); JobConf jobConf = new JobConf(false); assertEquals(new Text("oozie mr token"), has.getMRTokenRenewerInternal(jobConf)); - jobConf.set("mapred.job.tracker", "localhost:50300"); + jobConf.set("yarn.resourcemanager.address", "localhost:50300"); jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_h...@kdc.domain.com"); assertEquals(new Text("mapred/localh...@kdc.domain.com"), has.getMRTokenRenewerInternal(jobConf)); jobConf = new JobConf(false); @@ -298,4 +340,21 @@ public class TestHadoopAccessorService extends XTestCase { } has.destroy(); } + + public void testCreateLocalResourceForConfigurationFile() throws Exception { + HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); + String filename = "foo.xml"; + Configuration conf = has.createJobConf(getNameNodeUri()); + conf.set("foo", "bar"); + LocalResource lRes = has.createLocalResourceForConfigurationFile(filename, getTestUser(), conf, getFileSystem().getUri(), + getFsTestCaseDir()); + assertNotNull(lRes); + assertEquals(LocalResourceType.FILE, lRes.getType()); + assertEquals(LocalResourceVisibility.APPLICATION, lRes.getVisibility()); + Path resPath = ConverterUtils.getPathFromYarnURL(lRes.getResource()); + assertEquals(new Path(getFsTestCaseDir(), "foo.xml"), resPath); + Configuration conf2 = new Configuration(false); + conf2.addResource(getFileSystem().open(resPath)); + assertEquals("bar", conf2.get("foo")); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java index 8fd0c2d..ce04c6d 100644 --- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java +++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java @@ -21,10 +21,6 @@ package org.apache.oozie.service; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.BundleActionBean; import org.apache.oozie.BundleJobBean; import org.apache.oozie.CoordinatorActionBean; @@ -34,7 +30,7 @@ import org.apache.oozie.DagEngine; import org.apache.oozie.ForTestingActionExecutor; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.action.hadoop.LauncherMapperHelper; +import org.apache.oozie.action.hadoop.LauncherHelper; import org.apache.oozie.action.hadoop.MapReduceActionExecutor; import org.apache.oozie.action.hadoop.MapperReducerForTest; import org.apache.oozie.client.CoordinatorAction; @@ -249,24 +245,14 @@ public class TestRecoveryService extends XDataTestCase { ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job1, action1, false, false); MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor(); - JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf())); - String user = conf.get("user.name"); - String group = conf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); - + Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf())); String launcherId = action1.getExternalId(); - final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId)); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); - waitFor(240 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); } /** @@ -274,10 +260,8 @@ public class TestRecoveryService extends XDataTestCase { * @throws Exception */ public void testBundleRecoveryCoordCreate() throws Exception { - final BundleActionBean bundleAction; - final BundleJobBean bundle; - bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false); - bundleAction = addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP); + final BundleJobBean bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false); + addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP); final JPAService jpaService = Services.get().get(JPAService.class); sleep(3000); @@ -290,7 +274,7 @@ public class TestRecoveryService extends XDataTestCase { jpaService.execute(new BundleActionGetJPAExecutor(bundle.getId(), "coord1")); try { if (mybundleAction.getCoordId() != null) { - CoordinatorJobBean coord = jpaService.execute(new CoordJobGetJPAExecutor(mybundleAction.getCoordId())); + jpaService.execute(new CoordJobGetJPAExecutor(mybundleAction.getCoordId())); return true; } } catch (Exception e) { @@ -345,12 +329,11 @@ public class TestRecoveryService extends XDataTestCase { * @throws Exception */ public void testBundleRecoveryCoordExists() throws Exception { - final BundleActionBean bundleAction; final BundleJobBean bundle; final CoordinatorJob coord; bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false); coord = addRecordToCoordJobTable(Job.Status.PREP, false, false); - bundleAction = addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP); + addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP); final JPAService jpaService = Services.get().get(JPAService.class); sleep(3000);