http://git-wip-us.apache.org/repos/asf/oozie/blob/f5554dd3/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java b/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java index 80c8ec4..9473821 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java @@ -30,27 +30,25 @@ import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.BaseEngineException; import org.apache.oozie.BulkResponseInfo; -import org.apache.oozie.BundleJobBean; +import org.apache.oozie.BundleEngine; +import org.apache.oozie.BundleEngineException; import org.apache.oozie.BundleJobInfo; import org.apache.oozie.CoordinatorEngine; -import org.apache.oozie.BundleEngine; import org.apache.oozie.CoordinatorEngineException; -import org.apache.oozie.BundleEngineException; -import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.CoordinatorJobInfo; import org.apache.oozie.DagEngine; import org.apache.oozie.DagEngineException; import org.apache.oozie.ErrorCode; -import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.OozieJsonFactory; import org.apache.oozie.WorkflowsInfo; import org.apache.oozie.cli.OozieCLI; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.rest.BulkResponseImpl; import org.apache.oozie.client.rest.JsonTags; import org.apache.oozie.client.rest.RestConstants; +import org.apache.oozie.service.BundleEngineService; import org.apache.oozie.service.CoordinatorEngineService; import org.apache.oozie.service.DagEngineService; -import org.apache.oozie.service.BundleEngineService; import org.apache.oozie.service.Services; import org.apache.oozie.util.XLog; import org.apache.oozie.util.XmlUtils; @@ -308,12 +306,12 @@ public class V1JobsServlet extends BaseJobsServlet { * request object */ private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException { - JSONObject json = new JSONObject(); + JSONObject json; try { String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); String startStr = request.getParameter(RestConstants.OFFSET_PARAM); String lenStr = request.getParameter(RestConstants.LEN_PARAM); - String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null + String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); int start = (startStr != null) ? Integer.parseInt(startStr) : 1; start = (start < 1) ? 1 : start; @@ -321,12 +319,7 @@ public class V1JobsServlet extends BaseJobsServlet { len = (len < 1) ? 50 : len; DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len); - List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows(); - json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows, timeZoneId)); - json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal()); - json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart()); - json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen()); - + json = OozieJsonFactory.getWFJSONObject(jobs, timeZoneId); } catch (DagEngineException ex) { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); @@ -341,12 +334,12 @@ public class V1JobsServlet extends BaseJobsServlet { */ @SuppressWarnings("unchecked") private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException { - JSONObject json = new JSONObject(); + JSONObject json; try { String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); String startStr = request.getParameter(RestConstants.OFFSET_PARAM); String lenStr = request.getParameter(RestConstants.LEN_PARAM); - String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null + String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); int start = (startStr != null) ? Integer.parseInt(startStr) : 1; start = (start < 1) ? 1 : start; @@ -355,12 +348,7 @@ public class V1JobsServlet extends BaseJobsServlet { CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( getUser(request)); CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len); - List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs(); - json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs, timeZoneId)); - json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal()); - json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart()); - json.put(JsonTags.COORD_JOB_LEN, jobs.getLen()); - + json = OozieJsonFactory.getCoordJSONObject(jobs, timeZoneId); } catch (CoordinatorEngineException ex) { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); @@ -370,12 +358,12 @@ public class V1JobsServlet extends BaseJobsServlet { @SuppressWarnings("unchecked") private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException { - JSONObject json = new JSONObject(); + JSONObject json; try { String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); String startStr = request.getParameter(RestConstants.OFFSET_PARAM); String lenStr = request.getParameter(RestConstants.LEN_PARAM); - String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null + String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); int start = (startStr != null) ? Integer.parseInt(startStr) : 1; start = (start < 1) ? 1 : start; @@ -384,13 +372,7 @@ public class V1JobsServlet extends BaseJobsServlet { BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len); - List<BundleJobBean> jsonJobs = jobs.getBundleJobs(); - - json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs, timeZoneId)); - json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal()); - json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart()); - json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen()); - + json = OozieJsonFactory.getBundleJSONObject(jobs, timeZoneId); } catch (BundleEngineException ex) { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); @@ -505,110 +487,76 @@ public class V1JobsServlet extends BaseJobsServlet { int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; len = (len < 1) ? 50 : len; - JSONObject json = new JSONObject(); + JSONObject json; List<String> ids = new ArrayList<String>(); if (jobType.equals("wf")) { - WorkflowsInfo jobs = null; + WorkflowsInfo jobs; DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); - if (action.equals(RestConstants.JOB_ACTION_KILL)) { - try { - jobs = dagEngine.killJobs(filter, start, len); - } - catch (DagEngineException ex) { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); - } - } else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { - try { - jobs = dagEngine.suspendJobs(filter, start, len); - } - catch (DagEngineException ex) { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); - } - } else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { - try { - jobs = dagEngine.resumeJobs(filter, start, len); - } - catch (DagEngineException ex) { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + try { + switch (action) { + case RestConstants.JOB_ACTION_KILL: + jobs = dagEngine.killJobs(filter, start, len); + break; + case RestConstants.JOB_ACTION_SUSPEND: + jobs = dagEngine.suspendJobs(filter, start, len); + break; + case RestConstants.JOB_ACTION_RESUME: + jobs = dagEngine.resumeJobs(filter, start, len); + break; + default: + throw new DagEngineException(ErrorCode.E0301, action); } + } catch (DagEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); } - - json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jobs.getWorkflows(), timeZoneId)); - json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal()); - json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart()); - json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen()); - + json = OozieJsonFactory.getWFJSONObject(jobs, timeZoneId); } else if (jobType.equals("bundle")) { - BundleJobInfo jobs = null; - BundleEngine bundleEngine = Services.get().get(BundleEngineService.class). - getBundleEngine(getUser(request)); - if (action.equals(RestConstants.JOB_ACTION_KILL)) { - try { - jobs = bundleEngine.killJobs(filter, start, len); - } - catch (BundleEngineException ex) { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); - } - } - else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { - try { - jobs = bundleEngine.suspendJobs(filter, start, len); - } - catch (BundleEngineException ex) { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); - } - } - else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { - try { - jobs = bundleEngine.resumeJobs(filter, start, len); - } - catch (BundleEngineException ex) { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + BundleJobInfo jobs; + BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); + try { + switch (action) { + case RestConstants.JOB_ACTION_KILL: + jobs = bundleEngine.killJobs(filter, start, len); + break; + case RestConstants.JOB_ACTION_SUSPEND: + jobs = bundleEngine.suspendJobs(filter, start, len); + break; + case RestConstants.JOB_ACTION_RESUME: + jobs = bundleEngine.resumeJobs(filter, start, len); + break; + default: + throw new BundleEngineException(ErrorCode.E0301, action); } + } catch (BundleEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); } - - json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jobs.getBundleJobs(), timeZoneId)); - json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal()); - json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart()); - json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen()); + json = OozieJsonFactory.getBundleJSONObject(jobs, timeZoneId); } else { - CoordinatorJobInfo jobs = null; + CoordinatorJobInfo jobs; CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class). getCoordinatorEngine(getUser(request)); - if (action.equals(RestConstants.JOB_ACTION_KILL)) { - try { - jobs = coordEngine.killJobs(filter, start, len); - } - catch (CoordinatorEngineException ex) { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); - } - } - else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { - try { - jobs = coordEngine.suspendJobs(filter, start, len); - } - catch (CoordinatorEngineException ex) { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + try { + switch (action) { + case RestConstants.JOB_ACTION_KILL: + jobs = coordEngine.killJobs(filter, start, len); + break; + case RestConstants.JOB_ACTION_SUSPEND: + jobs = coordEngine.suspendJobs(filter, start, len); + break; + case RestConstants.JOB_ACTION_RESUME: + jobs = coordEngine.resumeJobs(filter, start, len); + break; + default: + throw new CoordinatorEngineException(ErrorCode.E0301, action); } + } catch (CoordinatorEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); } - else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { - try { - jobs = coordEngine.resumeJobs(filter, start, len); - } - catch (CoordinatorEngineException ex) { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); - } - } - - json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jobs.getCoordJobs(), timeZoneId)); - json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal()); - json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart()); - json.put(JsonTags.COORD_JOB_LEN, jobs.getLen()); + json = OozieJsonFactory.getCoordJSONObject(jobs, timeZoneId); } - json.put(JsonTags.JOB_IDS, toJSONArray(ids)); return json; }
http://git-wip-us.apache.org/repos/asf/oozie/blob/f5554dd3/core/src/test/java/org/apache/oozie/TestLocalOozieClientCoord.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/TestLocalOozieClientCoord.java b/core/src/test/java/org/apache/oozie/TestLocalOozieClientCoord.java index 4decd52..177bff7 100644 --- a/core/src/test/java/org/apache/oozie/TestLocalOozieClientCoord.java +++ b/core/src/test/java/org/apache/oozie/TestLocalOozieClientCoord.java @@ -28,12 +28,16 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Properties; +import org.apache.commons.lang.StringUtils; import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.Job; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; import org.apache.oozie.local.LocalOozie; import org.apache.oozie.service.Services; import org.apache.oozie.test.XDataTestCase; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; public class TestLocalOozieClientCoord extends XDataTestCase { @@ -81,7 +85,7 @@ public class TestLocalOozieClientCoord extends XDataTestCase { public void testHeaderMethods() { OozieClient client = LocalOozie.getCoordClient(); client.setHeader("h", "v"); - assertNull(client.getHeader("h")); + assertTrue("no-op, should be null/empty", StringUtils.isBlank(client.getHeader("h"))); Iterator<String> hit = client.getHeaderNames(); assertFalse(hit.hasNext()); try { @@ -92,7 +96,7 @@ public class TestLocalOozieClientCoord extends XDataTestCase { // expected } client.removeHeader("h"); - assertNull(client.getHeader("h")); + assertTrue("no-op, should be null/empty", StringUtils.isBlank(client.getHeader("h"))); } public void testGetJobsInfo() { @@ -192,4 +196,68 @@ public class TestLocalOozieClientCoord extends XDataTestCase { List<CoordinatorJob> list = client.getCoordJobsInfo("", 1, 5); assertEquals(2, list.size()); } + + public void testJobsOperations() throws Exception { + final OozieClient client = LocalOozie.getCoordClient(); + + // Just in case, check that there are no Coord job records left by previous tests: + List<CoordinatorJob> list0 = client.getCoordJobsInfo("", 1, 100); + assertEquals(0, list0.size()); + Properties conf = client.createConfiguration(); + String appPath = storedCoordAppPath(); + + conf.setProperty(OozieClient.COORDINATOR_APP_PATH, appPath); + final String jobId0 = client.run(conf); + final String jobId1 = client.run(conf); + final String jobId2 = client.run(conf); + waitFor(client, jobId0); + waitFor(client, jobId1); + waitFor(client, jobId2); + list0 = client.getCoordJobsInfo("name=NAME", 1, 10); + assertEquals(3, list0.size()); + + JSONObject jsonObject = client.suspendJobs("name=NAME", "coord", 1, 3); + assertEquals(3, jsonObject.get("total")); + assertEquals(3, ((JSONArray) jsonObject.get("coordinatorjobs")).size()); + assertEquals(Job.Status.SUSPENDED, client.getCoordJobInfo(jobId0).getStatus()); + assertEquals(Job.Status.SUSPENDED, client.getCoordJobInfo(jobId1).getStatus()); + assertEquals(Job.Status.SUSPENDED, client.getCoordJobInfo(jobId2).getStatus()); + + jsonObject = client.resumeJobs("name=NAME", "coord", 1, 3); + assertEquals(3, jsonObject.get("total")); + assertEquals(3, ((JSONArray) jsonObject.get("coordinatorjobs")).size()); + assertEquals(Job.Status.RUNNING, client.getCoordJobInfo(jobId0).getStatus()); + assertEquals(Job.Status.RUNNING, client.getCoordJobInfo(jobId1).getStatus()); + assertEquals(Job.Status.RUNNING, client.getCoordJobInfo(jobId2).getStatus()); + + jsonObject = client.killJobs("name=NAME", "coord", 1, 3); + assertEquals(3, jsonObject.get("total")); + assertEquals(3, ((JSONArray) jsonObject.get("coordinatorjobs")).size()); + assertEquals(Job.Status.KILLED, client.getCoordJobInfo(jobId0).getStatus()); + assertEquals(Job.Status.KILLED, client.getCoordJobInfo(jobId1).getStatus()); + assertEquals(Job.Status.KILLED, client.getCoordJobInfo(jobId2).getStatus()); + } + + private void waitFor(final OozieClient client, final String jobId) { + waitFor(10 * 1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + Job.Status status = client.getCoordJobInfo(jobId).getStatus(); + return status != Job.Status.PREP; + } + }); + } + + private String storedCoordAppPath() throws Exception { + String appPath = getTestCaseFileUri("coordinator.xml"); + String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:minutes(20)}\" " + + "start=\"2009-02-01T01:00Z\" end=\"2009-02-01T03:00Z\" timezone=\"UTC\" " + + "xmlns=\"uri:oozie:coordinator:0.1\"> <controls> <timeout>10</timeout> <concurrency>1</concurrency> " + + "<execution>LIFO</execution> </controls> " + + " <action> <workflow> " + + "<app-path>hdfs:///tmp/workflows/</app-path> " + +" </workflow> </action> </coordinator-app>"; + writeToFile(appXml, appPath); + return appPath; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/f5554dd3/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index d998b2d..cb141cb 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.4.0 release (trunk - unreleased) +OOZIE-2751 LocalOozieClient is missing methods from OozieClient (abhishekbafna via rkanter) OOZIE-2870 non working examples in oozie documentation coordinator spec (andras.piros via pbacsko) OOZIE-2827 amend More directly view of the coordinatorâs history from perspective of workflow action. (Alonzo Zhou via pbacsko) OOZIE-2851 spelling mistakes in examples (Artem Ervits via gezapeti)