OOZIE-2827 More directly view of the coordinatorâs history from perspective of workflow action. (Alonzo Zhou via pbacsko)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/40618313 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/40618313 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/40618313 Branch: refs/heads/master Commit: 4061831378daca80191e5abe296c3ee75e79c8bd Parents: 0e1a000 Author: Peter Bacsko <pbac...@cloudera.com> Authored: Wed Apr 19 13:05:36 2017 +0200 Committer: Peter Bacsko <pbac...@cloudera.com> Committed: Wed Apr 19 13:05:36 2017 +0200 ---------------------------------------------------------------------- .../oozie/client/CoordinatorWfAction.java | 48 + .../org/apache/oozie/client/rest/JsonTags.java | 5 + .../apache/oozie/client/rest/RestConstants.java | 4 + .../org/apache/oozie/CoordinatorEngine.java | 20 + .../apache/oozie/CoordinatorEngine.java.orig | 966 +++++++++++++++++++ .../apache/oozie/CoordinatorWfActionBean.java | 86 ++ .../main/java/org/apache/oozie/ErrorCode.java | 1 + .../coord/CoordWfActionInfoXCommand.java | 144 +++ .../jpa/WorkflowActionGetJPAExecutor.java | 16 +- .../apache/oozie/servlet/BaseJobServlet.java | 22 + .../org/apache/oozie/servlet/V2JobServlet.java | 45 + .../org/apache/oozie/client/TestOozieCLI.java | 38 +- .../coord/TestCoordWfActionInfoXCommand.java | 174 ++++ .../servlet/MockCoordinatorEngineService.java | 55 +- .../apache/oozie/servlet/TestV2JobServlet.java | 189 ++++ release-log.txt | 1 + 16 files changed, 1782 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/client/src/main/java/org/apache/oozie/client/CoordinatorWfAction.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/CoordinatorWfAction.java b/client/src/main/java/org/apache/oozie/client/CoordinatorWfAction.java new file mode 100644 index 0000000..056981d --- /dev/null +++ b/client/src/main/java/org/apache/oozie/client/CoordinatorWfAction.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.client; + +import java.text.MessageFormat; + +public interface CoordinatorWfAction{ + + enum NullReason{ + + ACTION_NULL("Could not get workflow action, no action named {0} in workflow {1}"), + PARENT_NULL("Could not get workflow action, workflow instance is null"),; + + private String template; + + NullReason(String template) { + this.template = template; + } + + public String getNullReason(Object... args) { + return MessageFormat.format(template, args); + } + + } + + int getActionNumber(); + + WorkflowAction getAction(); + + String getNullReason(); + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java index ca168e0..7b6f50c 100644 --- a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java +++ b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java @@ -143,6 +143,11 @@ public interface JsonTags { String COORDINATOR_ACTION_DATASETS = "dataSets"; String COORDINATOR_ACTION_DATASET = "dataSet"; + String COORDINATOR_WF_ACTION_NUMBER = "actionNumber"; + String COORDINATOR_WF_ACTION = "action"; + String COORDINATOR_WF_ACTION_NULL_REASON = "nullReason"; + String COORDINATOR_WF_ACTIONS = "actions"; + String BUNDLE_JOB_ID = "bundleJobId"; String BUNDLE_JOB_NAME = "bundleJobName"; String BUNDLE_JOB_PATH = "bundleJobPath"; http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java index 8ddb1f8..f477531 100644 --- a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java +++ b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java @@ -49,6 +49,8 @@ public interface RestConstants { String ORDER_PARAM = "order"; + String ACTION_NAME_PARAM = "action-name"; + String JOB_FILTER_PARAM = "filter"; String JOB_RESOURCE = "/job"; @@ -101,6 +103,8 @@ public interface RestConstants { String JOB_SHOW_STATUS = "status"; + String JOB_SHOW_WF_ACTIONS_IN_COORD = "wf-actions"; + String JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM = "coord-scope"; String JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM = "date-scope"; http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/CoordinatorEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java index 2c04bea..85de6f9 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java @@ -61,6 +61,7 @@ import org.apache.oozie.command.coord.CoordSLAChangeXCommand; import org.apache.oozie.command.coord.CoordSubmitXCommand; import org.apache.oozie.command.coord.CoordSuspendXCommand; import org.apache.oozie.command.coord.CoordUpdateXCommand; +import org.apache.oozie.command.coord.CoordWfActionInfoXCommand; import org.apache.oozie.dependency.ActionDependency; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; @@ -963,4 +964,23 @@ public class CoordinatorEngine extends BaseEngine { String actions, String dates) throws CommandException { return new CoordActionMissingDependenciesXCommand(id, actions, dates).call(); } + + /** + * get wf actions by action name in a coordinator job + * @param jobId coordinator job id + * @param wfActionName workflow action name + * @param offset + * @param len + * @return list of CoordinatorWfActionBean in a coordinator + * @throws CoordinatorEngineException + */ + public List<CoordinatorWfActionBean> getWfActionByJobIdAndName(String jobId, String wfActionName, int offset, int len) + throws CoordinatorEngineException { + try { + return new CoordWfActionInfoXCommand(jobId, wfActionName, offset, len).call(); + } + catch (CommandException ex) { + throw new CoordinatorEngineException(ex); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/CoordinatorEngine.java.orig ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java.orig b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java.orig new file mode 100644 index 0000000..2c04bea --- /dev/null +++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java.orig @@ -0,0 +1,966 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie; + +import java.io.IOException; +import java.io.Writer; +import java.sql.Timestamp; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.client.rest.RestConstants; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.OperationType; +import org.apache.oozie.command.coord.BulkCoordXCommand; +import org.apache.oozie.command.coord.CoordActionInfoXCommand; +import org.apache.oozie.command.coord.CoordActionsIgnoreXCommand; +import org.apache.oozie.command.coord.CoordActionsKillXCommand; +import org.apache.oozie.command.coord.CoordChangeXCommand; +import org.apache.oozie.command.coord.CoordActionMissingDependenciesXCommand; +import org.apache.oozie.command.coord.CoordJobXCommand; +import org.apache.oozie.command.coord.CoordJobsXCommand; +import org.apache.oozie.command.coord.CoordKillXCommand; +import org.apache.oozie.command.coord.CoordRerunXCommand; +import org.apache.oozie.command.coord.CoordResumeXCommand; +import org.apache.oozie.command.coord.CoordSLAAlertsDisableXCommand; +import org.apache.oozie.command.coord.CoordSLAAlertsEnableXCommand; +import org.apache.oozie.command.coord.CoordSLAChangeXCommand; +import org.apache.oozie.command.coord.CoordSubmitXCommand; +import org.apache.oozie.command.coord.CoordSuspendXCommand; +import org.apache.oozie.command.coord.CoordUpdateXCommand; +import org.apache.oozie.dependency.ActionDependency; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; +import org.apache.oozie.service.DagXLogInfoService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.XLogStreamingService; +import org.apache.oozie.util.CoordActionsInDateRange; +import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.JobUtils; +import org.apache.oozie.util.Pair; +import org.apache.oozie.util.ParamChecker; +import org.apache.oozie.util.XLog; +import org.apache.oozie.util.XLogFilter; +import org.apache.oozie.util.XLogStreamer; +import org.apache.oozie.util.XLogUserFilterParam; + +import com.google.common.annotations.VisibleForTesting; + +public class CoordinatorEngine extends BaseEngine { + private static final XLog LOG = XLog.getLog(CoordinatorEngine.class); + public final static String COORD_ACTIONS_LOG_MAX_COUNT = "oozie.coord.actions.log.max.count"; + private final static int COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT = 50; + private final int maxNumActionsForLog; + + public enum FILTER_COMPARATORS { + //This ordering is important, dont change this + GREATER_EQUAL(">="), GREATER(">"), LESSTHAN_EQUAL("<="), LESSTHAN("<"), NOT_EQUALS("!="), EQUALS("="); + + private final String sign; + + FILTER_COMPARATORS(String sign) { + this.sign = sign; + } + + public String getSign() { + return sign; + } + } + + public static final String[] VALID_JOB_FILTERS = {OozieClient.FILTER_STATUS, OozieClient.FILTER_NOMINAL_TIME}; + + /** + * Create a system Coordinator engine, with no user and no group. + */ + public CoordinatorEngine() { + maxNumActionsForLog = Services.get().getConf() + .getInt(COORD_ACTIONS_LOG_MAX_COUNT, COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT); + } + + /** + * Create a Coordinator engine to perform operations on behave of a user. + * + * @param user user name. + */ + public CoordinatorEngine(String user) { + this(); + this.user = ParamChecker.notEmpty(user, "user"); + } + + /* + * (non-Javadoc) + * + * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String) + */ + @Override + public String getDefinition(String jobId) throws BaseEngineException { + CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); + return job.getOrigJobXml(); + } + + /** + * @param jobId + * @return CoordinatorJobBean + * @throws BaseEngineException + */ + private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException { + try { + return new CoordJobXCommand(jobId).call(); + } + catch (CommandException ex) { + throw new BaseEngineException(ex); + } + } + + /** + * @param actionId + * @return CoordinatorActionBean + * @throws BaseEngineException + */ + public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException { + try { + return new CoordActionInfoXCommand(actionId).call(); + } + catch (CommandException ex) { + throw new BaseEngineException(ex); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String) + */ + @Override + public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException { + try { + return new CoordJobXCommand(jobId).call(); + } + catch (CommandException ex) { + throw new BaseEngineException(ex); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int) + */ + @Override + public CoordinatorJobBean getCoordJob(String jobId, String filter, int offset, int length, boolean desc) + throws BaseEngineException { + Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = parseJobFilter(filter); + try { + return new CoordJobXCommand(jobId, filterMap, offset, length, desc).call(); + } + catch (CommandException ex) { + throw new BaseEngineException(ex); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String) + */ + @Override + public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException { + return null; + } + + /* + * (non-Javadoc) + * + * @see org.apache.oozie.BaseEngine#kill(java.lang.String) + */ + @Override + public void kill(String jobId) throws CoordinatorEngineException { + try { + new CoordKillXCommand(jobId).call(); + LOG.info("User " + user + " killed the Coordinator job " + jobId); + } + catch (CommandException e) { + throw new CoordinatorEngineException(e); + } + } + + public CoordinatorActionInfo killActions(String jobId, String rangeType, String scope) throws CoordinatorEngineException { + try { + return new CoordActionsKillXCommand(jobId, rangeType, scope).call(); + } + catch (CommandException e) { + throw new CoordinatorEngineException(e); + } + } + + /* (non-Javadoc) + * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) + */ + @Override + public void change(String jobId, String changeValue) throws CoordinatorEngineException { + try { + new CoordChangeXCommand(jobId, changeValue).call(); + LOG.info("User " + user + " changed the Coordinator job [" + jobId + "] to " + changeValue); + } + catch (CommandException e) { + throw new CoordinatorEngineException(e); + } + } + + public CoordinatorActionInfo ignore(String jobId, String type, String scope) throws CoordinatorEngineException { + try { + LOG.info("User " + user + " ignore a Coordinator Action (s) [" + scope + "] of the Coordinator Job [" + + jobId + "]"); + return new CoordActionsIgnoreXCommand(jobId, type, scope).call(); + } + catch (CommandException e) { + throw new CoordinatorEngineException(e); + } + } + + @Override + @Deprecated + public void reRun(String jobId, Configuration conf) throws BaseEngineException { + throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of rerun")); + } + + /** + * Rerun coordinator actions for given rerunType + * + * @param jobId + * @param rerunType + * @param scope + * @param refresh + * @param noCleanup + * @throws BaseEngineException + */ + public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup, + boolean failed, Configuration conf) + throws BaseEngineException { + try { + return new CoordRerunXCommand(jobId, rerunType, scope, refresh, + noCleanup, failed, conf).call(); + } + catch (CommandException ex) { + throw new BaseEngineException(ex); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.oozie.BaseEngine#resume(java.lang.String) + */ + @Override + public void resume(String jobId) throws CoordinatorEngineException { + try { + new CoordResumeXCommand(jobId).call(); + } + catch (CommandException e) { + throw new CoordinatorEngineException(e); + } + } + + @Override + @Deprecated + public void start(String jobId) throws BaseEngineException { + throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start")); + } + + + @Override + protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) + throws IOException, BaseEngineException { + logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId); + Date lastTime = null; + CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); + if (job.isTerminalStatus()) { + lastTime = job.getLastModifiedTime(); + } + if (lastTime == null) { + lastTime = new Date(); + } + Services.get().get(XLogStreamingService.class).streamLog(logStreamer, job.getCreatedTime(), lastTime, writer); + } + + /** + * Add list of actions to the filter based on conditions + * + * @param jobId Job Id + * @param logRetrievalScope Value for the retrieval type + * @param logRetrievalType Based on which filter criteria the log is retrieved + * @param writer writer to stream the log to + * @param requestParameters additional parameters from the request + * @throws IOException + * @throws BaseEngineException + * @throws CommandException + */ + public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer, + Map<String, String[]> requestParameters) throws IOException, BaseEngineException, CommandException { + + Date startTime = null; + Date endTime = null; + XLogFilter filter = new XLogFilter(new XLogUserFilterParam(requestParameters)); + + filter.setParameter(DagXLogInfoService.JOB, jobId); + if (logRetrievalScope != null && logRetrievalType != null) { + // if coordinator action logs are to be retrieved based on action id range + if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) { + // Use set implementation that maintains order or elements to achieve reproducibility: + Set<String> actionSet = new LinkedHashSet<String>(); + String[] list = logRetrievalScope.split(","); + for (String s : list) { + s = s.trim(); + if (s.contains("-")) { + String[] range = s.split("-"); + if (range.length != 2) { + throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + + "'"); + } + int start; + int end; + try { + start = Integer.parseInt(range[0].trim()); + } catch (NumberFormatException ne) { + throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer", + ne); + } + try { + end = Integer.parseInt(range[1].trim()); + } catch (NumberFormatException ne) { + throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer", + ne); + } + if (start > end) { + throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'"); + } + for (int i = start; i <= end; i++) { + actionSet.add(jobId + "@" + i); + } + } + else { + try { + Integer.parseInt(s); + } + catch (NumberFormatException ne) { + throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s + + "'. Integer only."); + } + actionSet.add(jobId + "@" + s); + } + } + + if (actionSet.size() >= maxNumActionsForLog) { + throw new CommandException(ErrorCode.E0302, + "Retrieving log of too many coordinator actions. Max count is " + + maxNumActionsForLog + " actions"); + } + Iterator<String> actionsIterator = actionSet.iterator(); + StringBuilder orSeparatedActions = new StringBuilder(""); + boolean orRequired = false; + while (actionsIterator.hasNext()) { + if (orRequired) { + orSeparatedActions.append("|"); + } + orSeparatedActions.append(actionsIterator.next().toString()); + orRequired = true; + } + if (actionSet.size() > 1 && orRequired) { + orSeparatedActions.insert(0, "("); + orSeparatedActions.append(")"); + } + + filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); + if (actionSet != null && actionSet.size() == 1) { + CoordinatorActionBean actionBean = getCoordAction(actionSet.iterator().next()); + startTime = actionBean.getCreatedTime(); + endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean + .getLastModifiedTime(); + filter.setActionList(true); + } + else if (actionSet != null && actionSet.size() > 0) { + List<String> tempList = new ArrayList<String>(actionSet); + Collections.sort(tempList, new Comparator<String>() { + public int compare(String a, String b) { + return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo( + Integer.valueOf(b.substring(b.lastIndexOf("@") + 1))); + } + }); + startTime = getCoordAction(tempList.get(0)).getCreatedTime(); + endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, tempList.get(0), + tempList.get(tempList.size() - 1)); + filter.setActionList(true); + } + } + // if coordinator action logs are to be retrieved based on date range + // this block gets the corresponding list of coordinator actions to be used by the log filter + if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) { + List<String> coordActionIdList = null; + try { + coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope); + } + catch (XException xe) { + throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe); + } + if(coordActionIdList.size() >= maxNumActionsForLog) { + throw new CommandException(ErrorCode.E0302, + "Retrieving log of too many coordinator actions. Max count is " + + maxNumActionsForLog + " actions"); + } + StringBuilder orSeparatedActions = new StringBuilder(""); + boolean orRequired = false; + for (String coordActionId : coordActionIdList) { + if (orRequired) { + orSeparatedActions.append("|"); + } + orSeparatedActions.append(coordActionId); + orRequired = true; + } + if (coordActionIdList.size() > 1 && orRequired) { + orSeparatedActions.insert(0, "("); + orSeparatedActions.append(")"); + } + filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); + if (coordActionIdList != null && coordActionIdList.size() == 1) { + CoordinatorActionBean actionBean = getCoordAction(coordActionIdList.get(0)); + startTime = actionBean.getCreatedTime(); + endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean + .getLastModifiedTime(); + filter.setActionList(true); + } + else if (coordActionIdList != null && coordActionIdList.size() > 0) { + Collections.sort(coordActionIdList, new Comparator<String>() { + public int compare(String a, String b) { + return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo( + Integer.valueOf(b.substring(b.lastIndexOf("@") + 1))); + } + }); + startTime = getCoordAction(coordActionIdList.get(0)).getCreatedTime(); + endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, coordActionIdList.get(0), + coordActionIdList.get(coordActionIdList.size() - 1)); + filter.setActionList(true); + } + } + } + if (startTime == null || endTime == null) { + CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); + if (startTime == null) { + startTime = job.getCreatedTime(); + } + if (endTime == null) { + if (job.isTerminalStatus()) { + endTime = job.getLastModifiedTime(); + } + if (endTime == null) { + endTime = new Date(); + } + } + } + Services.get().get(XLogStreamingService.class).streamLog(new XLogStreamer(filter, requestParameters), startTime, + endTime, writer); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration + * , boolean) + */ + @Override + public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException { + try { + CoordSubmitXCommand submit = new CoordSubmitXCommand(conf); + return submit.call(); + } + catch (CommandException ex) { + throw new CoordinatorEngineException(ex); + } + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration) + */ + @Override + public String dryRunSubmit(Configuration conf) throws CoordinatorEngineException { + try { + CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf); + return submit.call(); + } + catch (CommandException ex) { + throw new CoordinatorEngineException(ex); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.oozie.BaseEngine#suspend(java.lang.String) + */ + @Override + public void suspend(String jobId) throws CoordinatorEngineException { + try { + new CoordSuspendXCommand(jobId).call(); + } + catch (CommandException e) { + throw new CoordinatorEngineException(e); + } + + } + + /* + * (non-Javadoc) + * + * @see org.apache.oozie.BaseEngine#getJob(java.lang.String) + */ + @Override + public WorkflowJob getJob(String jobId) throws BaseEngineException { + throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine")); + } + + /* + * (non-Javadoc) + * + * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int) + */ + @Override + public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException { + throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine")); + } + + private static final Set<String> FILTER_NAMES = new HashSet<String>(); + + static { + FILTER_NAMES.add(OozieClient.FILTER_USER); + FILTER_NAMES.add(OozieClient.FILTER_NAME); + FILTER_NAMES.add(OozieClient.FILTER_GROUP); + FILTER_NAMES.add(OozieClient.FILTER_STATUS); + FILTER_NAMES.add(OozieClient.FILTER_ID); + FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY); + FILTER_NAMES.add(OozieClient.FILTER_UNIT); + FILTER_NAMES.add(OozieClient.FILTER_SORT_BY); + FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_START); + FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_END); + FILTER_NAMES.add(OozieClient.FILTER_TEXT); + } + + /** + * @param filter + * @param start + * @param len + * @return CoordinatorJobInfo + * @throws CoordinatorEngineException + */ + public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException { + Map<String, List<String>> filterList = parseJobsFilter(filter); + + try { + return new CoordJobsXCommand(filterList, start, len).call(); + } + catch (CommandException ex) { + throw new CoordinatorEngineException(ex); + } + } + + // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values + public Map<Pair<String, FILTER_COMPARATORS>, List<Object>> parseJobFilter(String filter) throws + CoordinatorEngineException { + Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = new HashMap<Pair<String, + FILTER_COMPARATORS>, List<Object>>(); + if (filter != null) { + //split name value pairs + StringTokenizer st = new StringTokenizer(filter, ";"); + while (st.hasMoreTokens()) { + String token = st.nextToken().trim(); + Pair<String, FILTER_COMPARATORS> pair = null; + for (FILTER_COMPARATORS comp : FILTER_COMPARATORS.values()) { + if (token.contains(comp.getSign())) { + int index = token.indexOf(comp.getSign()); + String key = token.substring(0, index); + String valueStr = token.substring(index + comp.getSign().length()); + Object value; + + if (key.equalsIgnoreCase(OozieClient.FILTER_STATUS)) { + value = valueStr.toUpperCase(); + try { + CoordinatorAction.Status.valueOf((String) value); + } catch (IllegalArgumentException ex) { + // Check for incorrect status value + throw new CoordinatorEngineException(ErrorCode.E0421, filter, + XLog.format("invalid status value [{0}]." + " Valid status values are: [{1}]", + valueStr, StringUtils.join(CoordinatorAction.Status.values(), ", "))); + } + + if (!(comp == FILTER_COMPARATORS.EQUALS || comp == FILTER_COMPARATORS.NOT_EQUALS)) { + throw new CoordinatorEngineException(ErrorCode.E0421, filter, + XLog.format("invalid comparator [{0}] for status." + " Valid are = and !=", + comp.getSign())); + } + + pair = Pair.of(OozieClient.FILTER_STATUS, comp); + } else if (key.equalsIgnoreCase(OozieClient.FILTER_NOMINAL_TIME)) { + try { + value = new Timestamp(DateUtils.parseDateUTC(valueStr).getTime()); + } catch (ParseException e) { + throw new CoordinatorEngineException(ErrorCode.E0421, filter, + XLog.format("invalid nominal time [{0}]." + " Valid format: " + + "[{1}]", valueStr, DateUtils.ISO8601_UTC_MASK)); + } + pair = Pair.of(OozieClient.FILTER_NOMINAL_TIME, comp); + } else { + // Check for incorrect filter option + throw new CoordinatorEngineException(ErrorCode.E0421, filter, + XLog.format("invalid filter [{0}]." + " Valid filters [{1}]", key, StringUtils.join + (VALID_JOB_FILTERS, ", "))); + } + if (!filterMap.containsKey(pair)) { + filterMap.put(pair, new ArrayList<Object>()); + } + filterMap.get(pair).add(value); + break; + } + } + + if (pair == null) { + //token doesn't contain comparator + throw new CoordinatorEngineException(ErrorCode.E0421, filter, + "filter should be of format <key><comparator><value> pairs"); + } + } + } + return filterMap; + } + + /** + * @param filter + * @return Map<String, List<String>> + * @throws CoordinatorEngineException + */ + @VisibleForTesting + Map<String, List<String>> parseJobsFilter(String filter) throws CoordinatorEngineException { + Map<String, List<String>> map = new HashMap<String, List<String>>(); + boolean isTimeUnitSpecified = false; + String timeUnit = "MINUTE"; + boolean isFrequencySpecified = false; + String frequency = ""; + if (filter != null) { + StringTokenizer st = new StringTokenizer(filter, ";"); + while (st.hasMoreTokens()) { + String token = st.nextToken(); + if (token.contains("=")) { + String[] pair = token.split("="); + if (pair.length != 2) { + throw new CoordinatorEngineException(ErrorCode.E0420, filter, + "elements must be semicolon-separated name=value pairs"); + } + pair[0] = pair[0].toLowerCase(); + if (!FILTER_NAMES.contains(pair[0])) { + throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]", + pair[0])); + } + if (pair[0].equalsIgnoreCase("frequency")) { + isFrequencySpecified = true; + try { + frequency = (int) Float.parseFloat(pair[1]) + ""; + continue; + } + catch (NumberFormatException NANException) { + throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( + "invalid value [{0}] for frequency. A numerical value is expected", pair[1])); + } + } + if (pair[0].equalsIgnoreCase("unit")) { + isTimeUnitSpecified = true; + timeUnit = pair[1]; + if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days") + && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) { + throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( + "invalid value [{0}] for time unit. " + + "Valid value is one of months, days, hours or minutes", pair[1])); + } + continue; + } + if (pair[0].equals("status")) { + try { + CoordinatorJob.Status.valueOf(pair[1]); + } + catch (IllegalArgumentException ex) { + throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( + "invalid status [{0}]", pair[1])); + } + } + List<String> list = map.get(pair[0]); + if (list == null) { + list = new ArrayList<String>(); + map.put(pair[0], list); + } + list.add(pair[1]); + } else { + throw new CoordinatorEngineException(ErrorCode.E0420, filter, + "elements must be semicolon-separated name=value pairs"); + } + } + // Unit is specified and frequency is not specified + if (!isFrequencySpecified && isTimeUnitSpecified) { + throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when " + + "frequency is specified. Either specify frequency also or else remove the time unit"); + } else if (isFrequencySpecified) { + // Frequency value is specified + if (isTimeUnitSpecified) { + if (timeUnit.equalsIgnoreCase("months")) { + timeUnit = "MONTH"; + } else if (timeUnit.equalsIgnoreCase("days")) { + timeUnit = "DAY"; + } else if (timeUnit.equalsIgnoreCase("hours")) { + // When job details are persisted to database, frequency in hours are converted to minutes. + // This conversion is to conform with that. + frequency = Integer.parseInt(frequency) * 60 + ""; + timeUnit = "MINUTE"; + } else if (timeUnit.equalsIgnoreCase("minutes")) { + timeUnit = "MINUTE"; + } + } + // Adding the frequency and time unit filters to the filter map + List<String> list = new ArrayList<String>(); + list.add(timeUnit); + map.put("unit", list); + list = new ArrayList<String>(); + list.add(frequency); + map.put("frequency", list); + } + } + return map; + } + + public List<WorkflowJobBean> getReruns(String coordActionId) throws CoordinatorEngineException { + List<WorkflowJobBean> wfBeans; + try { + wfBeans = WorkflowJobQueryExecutor.getInstance().getList(WorkflowJobQuery.GET_WORKFLOWS_PARENT_COORD_RERUN, + coordActionId); + } + catch (JPAExecutorException e) { + throw new CoordinatorEngineException(e); + } + return wfBeans; + } + + /** + * Update coord job definition. + * + * @param conf the conf + * @param jobId the job id + * @param dryrun the dryrun + * @param showDiff the show diff + * @return the string + * @throws CoordinatorEngineException the coordinator engine exception + */ + public String updateJob(Configuration conf, String jobId, boolean dryrun, boolean showDiff) + throws CoordinatorEngineException { + try { + CoordUpdateXCommand update = new CoordUpdateXCommand(dryrun, conf, jobId, showDiff); + return update.call(); + } + catch (CommandException ex) { + throw new CoordinatorEngineException(ex); + } + } + + /** + * Return the status for a Job ID + * + * @param jobId job Id. + * @return the job's status + * @throws CoordinatorEngineException thrown if the job's status could not be obtained + */ + @Override + public String getJobStatus(String jobId) throws CoordinatorEngineException { + try { + CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get( + CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB_STATUS, jobId); + return coordJob.getStatusStr(); + } + catch (JPAExecutorException e) { + throw new CoordinatorEngineException(e); + } + } + + /** + * Return the status for an Action ID + * + * @param actionId action Id. + * @return the action's status + * @throws CoordinatorEngineException thrown if the action's status could not be obtained + */ + public String getActionStatus(String actionId) throws CoordinatorEngineException { + try { + CoordinatorActionBean coordAction = CoordActionQueryExecutor.getInstance().get( + CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION_STATUS, actionId); + return coordAction.getStatusStr(); + } + catch (JPAExecutorException e) { + throw new CoordinatorEngineException(e); + } + } + + @Override + public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { + try { + new CoordSLAAlertsDisableXCommand(id, actions, dates).call(); + + } + catch (CommandException e) { + throw new CoordinatorEngineException(e); + } + } + + @Override + public void changeSLA(String id, String actions, String dates, String childIds, String newParams) + throws BaseEngineException { + Map<String, String> slaNewParams = null; + + try { + + if (newParams != null) { + slaNewParams = JobUtils.parseChangeValue(newParams); + } + + new CoordSLAChangeXCommand(id, actions, dates, slaNewParams).call(); + + } + catch (CommandException e) { + throw new CoordinatorEngineException(e); + } + } + + @Override + public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { + try { + new CoordSLAAlertsEnableXCommand(id, actions, dates).call(); + + } + catch (CommandException e) { + throw new CoordinatorEngineException(e); + } + } + + /** + * return a list of killed Coordinator job + * + * @param filter, the filter string for which the coordinator jobs are killed + * @param start, the starting index for coordinator jobs + * @param length, maximum number of jobs to be killed + * @return the list of jobs being killed + * @throws CoordinatorEngineException thrown if one or more of the jobs cannot be killed + */ + public CoordinatorJobInfo killJobs(String filter, int start, int length) throws CoordinatorEngineException { + try { + Map<String, List<String>> filterMap = parseJobsFilter(filter); + CoordinatorJobInfo coordinatorJobInfo = + new BulkCoordXCommand(filterMap, start, length, OperationType.Kill).call(); + if (coordinatorJobInfo == null) { + return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); + } + return coordinatorJobInfo; + } + catch (CommandException ex) { + throw new CoordinatorEngineException(ex); + } + } + + /** + * return the jobs that've been suspended + * @param filter Filter for jobs that will be suspended, can be name, user, group, status, id or combination of any + * @param start Offset for the jobs that will be suspended + * @param length maximum number of jobs that will be suspended + * @return + * @throws CoordinatorEngineException + */ + public CoordinatorJobInfo suspendJobs(String filter, int start, int length) throws CoordinatorEngineException { + try { + Map<String, List<String>> filterMap = parseJobsFilter(filter); + CoordinatorJobInfo coordinatorJobInfo = + new BulkCoordXCommand(filterMap, start, length, OperationType.Suspend).call(); + if (coordinatorJobInfo == null) { + return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); + } + return coordinatorJobInfo; + } + catch (CommandException ex) { + throw new CoordinatorEngineException(ex); + } + } + + /** + * return the jobs that've been resumed + * @param filter Filter for jobs that will be resumed, can be name, user, group, status, id or combination of any + * @param start Offset for the jobs that will be resumed + * @param length maximum number of jobs that will be resumed + * @return + * @throws CoordinatorEngineException + */ + public CoordinatorJobInfo resumeJobs(String filter, int start, int length) throws CoordinatorEngineException { + try { + Map<String, List<String>> filterMap = parseJobsFilter(filter); + CoordinatorJobInfo coordinatorJobInfo = + new BulkCoordXCommand(filterMap, start, length, OperationType.Resume).call(); + if (coordinatorJobInfo == null) { + return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); + } + return coordinatorJobInfo; + } + catch (CommandException ex) { + throw new CoordinatorEngineException(ex); + } + } + /** + * Get coord action missing dependencies + * @param id jobID + * @param actions action list + * @param dates nominal time list + * @return pair of coord action bean and list of missing input dependencies. + * @throws CommandException + */ + public List<Pair<CoordinatorActionBean, Map<String, ActionDependency>>> getCoordActionMissingDependencies(String id, + String actions, String dates) throws CommandException { + return new CoordActionMissingDependenciesXCommand(id, actions, dates).call(); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/CoordinatorWfActionBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorWfActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorWfActionBean.java new file mode 100644 index 0000000..8a40e20 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/CoordinatorWfActionBean.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie; + +import org.apache.oozie.client.CoordinatorWfAction; +import org.apache.oozie.client.rest.JsonBean; +import org.apache.oozie.client.rest.JsonTags; +import org.json.simple.JSONObject; + +public class CoordinatorWfActionBean implements CoordinatorWfAction, JsonBean{ + + private int actionNumber; + + private WorkflowActionBean action; + + private String strNullReason; + + public CoordinatorWfActionBean(int actionNumber) { + this(actionNumber, null, null); + } + + public CoordinatorWfActionBean(int actionNumber, WorkflowActionBean action, String nullReason) { + this.actionNumber = actionNumber; + this.action = action; + this.strNullReason = nullReason; + } + + public int getActionNumber() { + return actionNumber; + } + + public WorkflowActionBean getAction() { + return action; + } + + public String getNullReason() { + return strNullReason; + } + + public void setActionNumber(int actionNumber) { + this.actionNumber = actionNumber; + } + + public void setAction(WorkflowActionBean action) { + this.action = action; + } + + public void setNullReason(String nullReason) { + this.strNullReason = nullReason; + } + + @Override + public JSONObject toJSONObject() { + return toJSONObject("GMT"); + } + + @Override + public JSONObject toJSONObject(String timeZoneId) { + JSONObject json = new JSONObject(); + json.put(JsonTags.COORDINATOR_WF_ACTION_NUMBER, actionNumber); + json.put(JsonTags.COORDINATOR_WF_ACTION_NULL_REASON, strNullReason); + if (action != null) { + json.put(JsonTags.COORDINATOR_WF_ACTION, action.toJSONObject(timeZoneId)); + } + else { + json.put(JsonTags.COORDINATOR_WF_ACTION, action); + } + return json; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/ErrorCode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java index b03ad06..662e1ed 100644 --- a/core/src/main/java/org/apache/oozie/ErrorCode.java +++ b/core/src/main/java/org/apache/oozie/ErrorCode.java @@ -105,6 +105,7 @@ public enum ErrorCode { E0609(XLog.OPS, "Missing [{0}] ORM file [{1}]"), E0610(XLog.OPS, "Missing JPAService, StoreService cannot run without a JPAService"), E0611(XLog.OPS, "SQL error in operation [{0}], {1}"), + E0612(XLog.OPS, "Could not get coordinator actions"), E0700(XLog.STD, "XML error, {0}"), E0701(XLog.STD, "XML schema error, {0}"), http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/command/coord/CoordWfActionInfoXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordWfActionInfoXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordWfActionInfoXCommand.java new file mode 100644 index 0000000..8536ed9 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordWfActionInfoXCommand.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.coord; + +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.CoordinatorWfActionBean; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.client.CoordinatorWfAction; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.executor.jpa.CoordJobGetActionsSubsetJPAExecutor; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; +import org.apache.oozie.util.ParamChecker; + +import java.util.ArrayList; +import java.util.List; + +public class CoordWfActionInfoXCommand extends CoordinatorXCommand<List<CoordinatorWfActionBean>>{ + /** + * This class gets the wf action info in coordinator by action name and coordinator job ID. + */ + private static final String ACTION_INFO = "action.info"; + private static final int DEFAULT_OFFSET = 1; + private static final int DEFAULT_LEN = 50; + + private final String jobId; + private final String actionName; + private final int offset; + private final int len; + private List<CoordinatorActionBean> coordActions; + private JPAService jpaService = null; + + public CoordWfActionInfoXCommand(String jobId, String actionName) { + this(jobId, actionName, DEFAULT_OFFSET, DEFAULT_LEN); + } + + public CoordWfActionInfoXCommand(String jobId, String actionName, int offset, int len) { + super(ACTION_INFO, ACTION_INFO, 1); + + this.jobId = ParamChecker.notEmpty(jobId, "jobId"); + this.actionName = ParamChecker.notEmpty(actionName, "actionName"); + this.offset = offset; + this.len = len; + } + + @Override + protected List<CoordinatorWfActionBean> execute() throws CommandException { + List<CoordinatorWfActionBean> coordWfActions = new ArrayList<CoordinatorWfActionBean>(); + for(CoordinatorActionBean coordAction : coordActions) { + String wfId = coordAction.getExternalId(); + String nullReason = null; + WorkflowActionBean wfAction = null; + if (wfId != null) { + String wfActionId = wfId + "@" + actionName; + try { + wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(wfActionId, true)); + if (wfAction == null) { + nullReason = CoordinatorWfAction.NullReason.ACTION_NULL.getNullReason(actionName, wfId); + } + } catch (JPAExecutorException ex) { + throw new CommandException(ex); + } + } else { + nullReason = CoordinatorWfAction.NullReason.PARENT_NULL.getNullReason(); + LOG.warn(nullReason); + wfAction = null; + } + int actionNumber = coordAction.getActionNumber(); + CoordinatorWfActionBean coordWfAction = new CoordinatorWfActionBean(actionNumber, wfAction, nullReason); + + coordWfActions.add(coordWfAction); + } + return coordWfActions; + } + + /** + * (non-Javadoc) + * @see org.apache.oozie.command.XCommand#loadState() + **/ + @Override + protected void loadState() throws CommandException { + jpaService = Services.get().get(JPAService.class); + if (jpaService != null) { + try { + coordActions = jpaService.execute( + new CoordJobGetActionsSubsetJPAExecutor(jobId, null, offset, len, false)); + } catch (JPAExecutorException ex) { + LOG.error(ErrorCode.E0612); + throw new CommandException(ex); + } + } + else { + throw new CommandException(ErrorCode.E0610); + } + } + + /** + * (non-Javadoc) + * @see org.apache.oozie.command.XCommand#verifyPrecondition() + * no-op + **/ + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + } + + /** + * (non-Javadoc) + * @see org.apache.oozie.command.XCommand#isLockRequired() + **/ + @Override + protected boolean isLockRequired() { + return false; + } + + /** + * (non-Javadoc) + * @see org.apache.oozie.command.XCommand#getEntityKey() + **/ + @Override + public String getEntityKey() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetJPAExecutor.java index 0b7f50d..f8ac11e 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetJPAExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetJPAExecutor.java @@ -33,11 +33,19 @@ import org.apache.oozie.util.XLog; */ public class WorkflowActionGetJPAExecutor implements JPAExecutor<WorkflowActionBean> { + public XLog LOG = XLog.getLog(getClass()); + private String wfActionId = null; + private final boolean isNullAcceptable; public WorkflowActionGetJPAExecutor(String wfActionId) { + this(wfActionId, false); + } + + public WorkflowActionGetJPAExecutor(String wfActionId, boolean isNullAcceptable) { ParamChecker.notNull(wfActionId, "wfActionId"); this.wfActionId = wfActionId; + this.isNullAcceptable = isNullAcceptable; } /* (non-Javadoc) @@ -69,7 +77,13 @@ public class WorkflowActionGetJPAExecutor implements JPAExecutor<WorkflowActionB return bean; } else { - throw new JPAExecutorException(ErrorCode.E0605, wfActionId); + if (isNullAcceptable) { + LOG.warn("Could not get workflow action {0}", wfActionId); + return null; + } + else { + throw new JPAExecutorException(ErrorCode.E0605, wfActionId); + } } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java index 03acbc1..a9ea615 100644 --- a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java @@ -48,6 +48,8 @@ public abstract class BaseJobServlet extends JsonRestServlet { private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1]; + final static String NOT_SUPPORTED_MESSAGE = "Not supported in this version"; + static { RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo( RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo( @@ -368,6 +370,12 @@ public abstract class BaseJobServlet extends JsonRestServlet { startCron(); sendJsonResponse(response, HttpServletResponse.SC_OK, json); } + else if (show.equals(RestConstants.JOB_SHOW_WF_ACTIONS_IN_COORD)) { + stopCron(); + JSONObject json = getWfActionByJobIdAndName(request, response); + startCron(); + sendJsonResponse(response, HttpServletResponse.SC_OK, json); + } else { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.JOB_SHOW_PARAM, show); @@ -607,4 +615,18 @@ public abstract class BaseJobServlet extends JsonRestServlet { */ abstract JSONObject getCoordActionMissingDependencies(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException; + + /** + * get wf actions by name in coordinator job + * + * @param request the request + * @param response the response + * @return the JSON object + * @throws XServletException the x servlet exception + * @throws IOException Signals that an I/O exception has occurred. + */ + protected JSONObject getWfActionByJobIdAndName(HttpServletRequest request, HttpServletResponse response) + throws XServletException, IOException { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java index 6c30f5d..c2b90c1 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java @@ -35,6 +35,8 @@ import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorActionInfo; import org.apache.oozie.CoordinatorEngine; import org.apache.oozie.CoordinatorEngineException; +import org.apache.oozie.CoordinatorWfActionBean; +import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.DagEngine; import org.apache.oozie.DagEngineException; import org.apache.oozie.ErrorCode; @@ -50,6 +52,7 @@ import org.apache.oozie.service.BundleEngineService; import org.apache.oozie.service.CoordinatorEngineService; import org.apache.oozie.service.DagEngineService; import org.apache.oozie.service.Services; +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.util.Pair; import org.json.simple.JSONArray; import org.json.simple.JSONObject; @@ -381,4 +384,46 @@ public class V2JobServlet extends V1JobServlet { } } + @Override + protected JSONObject getWfActionByJobIdAndName(HttpServletRequest request, HttpServletResponse response) + throws XServletException, IOException { + CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( + getUser(request)); + String jobId = getResourceName(request); + String action = request.getParameter(RestConstants.ACTION_NAME_PARAM); + String startStr = request.getParameter(RestConstants.OFFSET_PARAM); + String lenStr = request.getParameter(RestConstants.LEN_PARAM); + String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM); + timeZoneId = (timeZoneId == null) ? "GMT" : timeZoneId; + + if (action == null) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, + ErrorCode.E0305, RestConstants.ACTION_NAME_PARAM); + } + + int offset = (startStr != null) ? Integer.parseInt(startStr) : 1; + offset = (offset < 1) ? 1 : offset; + /** + * set default number of wf actions to be retrieved to + * default number of coordinator actions to be retrieved + **/ + int defaultLen = ConfigurationService.getInt(COORD_ACTIONS_DEFAULT_LENGTH); + int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; + len = getCoordinatorJobLength(defaultLen, len); + + try { + JSONObject json = new JSONObject(); + List<CoordinatorWfActionBean> coordWfActions = coordEngine.getWfActionByJobIdAndName(jobId, action, offset, len); + JSONArray array = new JSONArray(); + for (CoordinatorWfActionBean coordWfAction : coordWfActions) { + array.add(coordWfAction.toJSONObject(timeZoneId)); + } + json.put(JsonTags.COORDINATOR_JOB_ID, jobId); + json.put(JsonTags.COORDINATOR_WF_ACTIONS, array); + return json; + } + catch (CoordinatorEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java index 564db2a..dbc160f 100644 --- a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java +++ b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java @@ -542,7 +542,7 @@ public class TestOozieCLI extends DagServletTestCase { MockCoordinatorEngineService.JOB_ID + "1", "-action", "1" }; assertEquals(0, new OozieCLI().run(args)); assertEquals(RestConstants.JOB_ACTION_KILL, MockCoordinatorEngineService.did); - assertFalse(MockCoordinatorEngineService.started.get(1)); + assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1)); return null; } }); @@ -566,7 +566,7 @@ public class TestOozieCLI extends DagServletTestCase { MockCoordinatorEngineService.JOB_ID + "1", "-date", "2009-12-15T01:00Z::2009-12-16T01:00Z" }; assertEquals(0, new OozieCLI().run(args)); assertEquals(RestConstants.JOB_ACTION_KILL, MockCoordinatorEngineService.did); - assertFalse(MockCoordinatorEngineService.started.get(1)); + assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1)); return null; } }); @@ -608,7 +608,7 @@ public class TestOozieCLI extends DagServletTestCase { "-action", "1" }; assertEquals(0, new OozieCLI().run(args)); assertEquals(RestConstants.JOB_COORD_ACTION_RERUN, MockCoordinatorEngineService.did); - assertTrue(MockCoordinatorEngineService.started.get(1)); + assertTrue(MockCoordinatorEngineService.startedCoordJobs.get(1)); return null; } }); @@ -632,7 +632,7 @@ public class TestOozieCLI extends DagServletTestCase { "-date", "2009-12-15T01:00Z::2009-12-16T01:00Z" }; assertEquals(0, new OozieCLI().run(args)); assertEquals(RestConstants.JOB_COORD_ACTION_RERUN, MockCoordinatorEngineService.did); - assertTrue(MockCoordinatorEngineService.started.get(1)); + assertTrue(MockCoordinatorEngineService.startedCoordJobs.get(1)); return null; } }); @@ -656,7 +656,7 @@ public class TestOozieCLI extends DagServletTestCase { "-action", "0", "-refresh" }; assertEquals(0, new OozieCLI().run(args)); assertEquals(RestConstants.JOB_COORD_ACTION_RERUN, MockCoordinatorEngineService.did); - assertTrue(MockCoordinatorEngineService.started.get(0)); + assertTrue(MockCoordinatorEngineService.startedCoordJobs.get(0)); return null; } }); @@ -679,7 +679,7 @@ public class TestOozieCLI extends DagServletTestCase { "-action", "0", "-nocleanup" }; assertEquals(0, new OozieCLI().run(args)); assertEquals(RestConstants.JOB_COORD_ACTION_RERUN, MockCoordinatorEngineService.did); - assertTrue(MockCoordinatorEngineService.started.get(0)); + assertTrue(MockCoordinatorEngineService.startedCoordJobs.get(0)); return null; } }); @@ -703,7 +703,7 @@ public class TestOozieCLI extends DagServletTestCase { "-date", "2009-12-15T01:00Z", "-action", "1" }; assertEquals(-1, new OozieCLI().run(args)); assertNull(MockCoordinatorEngineService.did); - assertFalse(MockCoordinatorEngineService.started.get(1)); + assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1)); return null; } }); @@ -726,7 +726,7 @@ public class TestOozieCLI extends DagServletTestCase { MockCoordinatorEngineService.JOB_ID + "1" + MockDagEngineService.JOB_ID_END}; assertEquals(-1, new OozieCLI().run(args)); assertNull(MockCoordinatorEngineService.did); - assertFalse(MockCoordinatorEngineService.started.get(1)); + assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1)); return null; } }); @@ -751,7 +751,7 @@ public class TestOozieCLI extends DagServletTestCase { "-rerun", MockCoordinatorEngineService.JOB_ID + "0" }; assertEquals(-1, new OozieCLI().run(args)); assertNull(MockCoordinatorEngineService.did); - assertFalse(MockCoordinatorEngineService.started.get(1)); + assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1)); return null; } }); @@ -776,7 +776,7 @@ public class TestOozieCLI extends DagServletTestCase { assertEquals(-1, new OozieCLI().run(args)); assertNull(MockCoordinatorEngineService.did); - assertFalse(MockCoordinatorEngineService.started.get(1)); + assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1)); return null; } }); @@ -790,7 +790,7 @@ public class TestOozieCLI extends DagServletTestCase { String[] args = new String[]{"job", "-oozie", oozieUrl, "-ignore", MockCoordinatorEngineService.JOB_ID + "1"}; assertEquals(0, new OozieCLI().run(args)); assertEquals(RestConstants.JOB_ACTION_CHANGE, MockCoordinatorEngineService.did); - assertTrue(MockCoordinatorEngineService.started.get(1)); + assertTrue(MockCoordinatorEngineService.startedCoordJobs.get(1)); // negative test for "oozie job -ignore <non-existent coord>" MockCoordinatorEngineService.reset(); @@ -799,7 +799,7 @@ public class TestOozieCLI extends DagServletTestCase { MockDagEngineService.JOB_ID + (MockCoordinatorEngineService.coordJobs.size() + 1)}; assertEquals(-1, new OozieCLI().run(args)); assertNull(MockCoordinatorEngineService.did); - assertFalse(MockCoordinatorEngineService.started.get(1)); + assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1)); return null; } }); @@ -814,7 +814,7 @@ public class TestOozieCLI extends DagServletTestCase { MockCoordinatorEngineService.JOB_ID + "1", "-action", "1"}; assertEquals(0, new OozieCLI().run(args)); assertEquals(RestConstants.JOB_ACTION_IGNORE, MockCoordinatorEngineService.did); - assertTrue(MockCoordinatorEngineService.started.get(1)); + assertTrue(MockCoordinatorEngineService.startedCoordJobs.get(1)); // negative test for "oozie job -ignore <non-existent coord> -action 1" MockCoordinatorEngineService.reset(); @@ -822,7 +822,7 @@ public class TestOozieCLI extends DagServletTestCase { MockDagEngineService.JOB_ID + (MockCoordinatorEngineService.coordJobs.size() + 1), "-action", "1" }; assertEquals(-1, new OozieCLI().run(args)); assertNull(MockCoordinatorEngineService.did); - assertFalse(MockCoordinatorEngineService.started.get(1)); + assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1)); // negative test for "oozie job -ignore <id> -action (action is empty)" MockCoordinatorEngineService.reset(); @@ -830,7 +830,7 @@ public class TestOozieCLI extends DagServletTestCase { MockCoordinatorEngineService.JOB_ID, "-action", ""}; assertEquals(-1, new OozieCLI().run(args)); assertNull(MockCoordinatorEngineService.did); - assertFalse(MockCoordinatorEngineService.started.get(1)); + assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1)); return null; } @@ -1417,7 +1417,7 @@ public class TestOozieCLI extends DagServletTestCase { "-oozie", oozieUrl, "-Doozie.proxysubmission=true" }; assertEquals(0, new OozieCLI().run(args)); assertEquals(MockCoordinatorEngineService.did, RestConstants.JOB_ACTION_DRYRUN); - assertFalse(MockCoordinatorEngineService.started.get(1)); + assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1)); return null; } }); @@ -1432,7 +1432,7 @@ public class TestOozieCLI extends DagServletTestCase { String[] args = new String[] { "job", "-update", "aaa", "-oozie", oozieUrl }; assertEquals(-1, new OozieCLI().run(args)); assertEquals(MockCoordinatorEngineService.did, RestConstants.JOB_COORD_UPDATE ); - assertFalse(MockCoordinatorEngineService.started.get(1)); + assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1)); return null; } }); @@ -1450,7 +1450,7 @@ public class TestOozieCLI extends DagServletTestCase { assertEquals(-1, new OozieCLI().run(args)); assertEquals(MockCoordinatorEngineService.did, RestConstants.JOB_COORD_UPDATE + "&" + RestConstants.JOB_ACTION_DRYRUN); - assertFalse(MockCoordinatorEngineService.started.get(1)); + assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1)); return null; } }); @@ -1691,7 +1691,7 @@ public class TestOozieCLI extends DagServletTestCase { String[] args = new String[] { "job", "-missingdeps", "aaa-C", "-oozie", oozieUrl }; assertEquals(0, new OozieCLI().run(args)); assertEquals(MockCoordinatorEngineService.did, RestConstants.COORD_ACTION_MISSING_DEPENDENCIES); - assertFalse(MockCoordinatorEngineService.started.get(1)); + assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1)); return null; } }); http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/test/java/org/apache/oozie/command/coord/TestCoordWfActionInfoXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordWfActionInfoXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordWfActionInfoXCommand.java new file mode 100644 index 0000000..ceaa707 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordWfActionInfoXCommand.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.coord; + +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; +import org.apache.oozie.local.LocalOozie; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.workflow.WorkflowInstance; +import org.apache.oozie.CoordinatorWfActionBean; +import org.apache.oozie.client.CoordinatorWfAction; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +public class TestCoordWfActionInfoXCommand extends XDataTestCase { + Services services; + + private CoordinatorJobBean coordJob; + private List<WorkflowJobBean> wfJobs; + private List<CoordinatorActionBean> coordActions; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + services.init(); + + createTestData(); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + /** + * init the test case. + * 1 coordJob + * 5 coordAction created by the coordJob, while the 5th coordAction's workflow instance is null + * 4 wfJob match the 1st ~ 4th coordAction + * the 1st - 3rd wfAction has a wfAction named 'aa' each, but the 4th desn't. + */ + private void createTestData() throws Exception { + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull("Missing jpa service", jpaService); + + coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false); + wfJobs = new ArrayList<WorkflowJobBean>(); + coordActions = new ArrayList<CoordinatorActionBean>(); + + for(int i = 0; i < 4; i++) { + WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + wfJobs.add(wfJob); + } + for(int i = 0; i < 4; i++) { + CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), (i+1), + CoordinatorAction.Status.SUCCEEDED,"coord-action-get.xml", wfJobs.get(i).getId(), "SUCCEEDED", 0); + coordActions.add(coordAction); + } + + //add a coordAction that doesnt create workflow instance yet + CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 5, + CoordinatorAction.Status.SUCCEEDED,"coord-action-get.xml", null, null, 0); + coordActions.add(coordAction); + + //set the NominalTime,in order to keep the order of the coordAction. + for(int i = 0; i < 5; i++) { + setCoordActionNominalTime(coordActions.get(i).getId(), (i+1) * 1000); + } + + //create the case that the 4th wfJob doesnt have a action named "aa" + for(int i = 0; i < 4; i++) { + String name = (i == 3) ? "bb" : "aa"; + addRecordToWfActionTable(wfJobs.get(i).getId(), name, WorkflowAction.Status.DONE); + } + } + + public void testNormalCase() throws Exception { + int offset = 2, len = 2; + List<CoordinatorWfActionBean> coordWfActions = new CoordWfActionInfoXCommand(coordJob.getId(), "aa", offset, len).call(); + assertEquals(2, coordWfActions.size()); + List<String> wfIds = Arrays.asList(wfJobs.get(1).getId(), wfJobs.get(2).getId()); + + for(int i = 0; i < coordWfActions.size(); i++) { + CoordinatorWfActionBean coordWfAction = coordWfActions.get(i); + WorkflowActionBean wfAction = coordWfAction.getAction(); + + assertEquals(i + offset, coordWfActions.get(i).getActionNumber()); + assertEquals(wfIds.get(i), wfAction.getWfId()); + assertEquals(null, coordWfAction.getNullReason()); + } + } + + public void testActionMissing() throws CommandException{ + List<CoordinatorWfActionBean> coordWfActions = new CoordWfActionInfoXCommand(coordJob.getId(), "aa", 2, 3).call(); + assertEquals(3, coordWfActions.size()); + + assertEquals(wfJobs.get(1).getId(), coordWfActions.get(0).getAction().getWfId()); + assertEquals(wfJobs.get(2).getId(), coordWfActions.get(1).getAction().getWfId()); + CoordinatorWfActionBean coordWfAction = coordWfActions.get(2); + assertEquals(4, coordWfAction.getActionNumber()); + assertEquals(null, coordWfAction.getAction()); + String expectNullReason = CoordinatorWfAction.NullReason.ACTION_NULL.getNullReason("aa", wfJobs.get(3).getId()); + assertEquals(expectNullReason, coordWfAction.getNullReason()); + } + + public void testWorkflowInstanceMissing() throws CommandException { + List<CoordinatorWfActionBean> coordWfActions = new CoordWfActionInfoXCommand(coordJob.getId(), "aa", 2, 4).call(); + assertEquals(4, coordWfActions.size()); + + CoordinatorWfActionBean coordWfAction = coordWfActions.get(3); + assertEquals(5, coordWfAction.getActionNumber()); + assertEquals(null, coordWfAction.getAction()); + String expectNullReason = CoordinatorWfAction.NullReason.PARENT_NULL.getNullReason(); + assertEquals(expectNullReason, coordWfAction.getNullReason()); + } + + //test offset out of Range + public void testOffsetOutOfRange() throws CommandException { + List<CoordinatorWfActionBean> coordWfActions = new CoordWfActionInfoXCommand(coordJob.getId(), "aa", 6, 4).call(); + assertEquals(0, coordWfActions.size()); + } + + //test len out of Range + public void testLenOutOfRange() throws CommandException { + int offset = 2; + List<CoordinatorWfActionBean> coordWfActions = new CoordWfActionInfoXCommand(coordJob.getId(), "aa", 2, 19).call(); + assertEquals(4, coordWfActions.size()); + + for(int i = 0; i < coordWfActions.size(); i++) { + assertEquals(i + offset, coordWfActions.get(i).getActionNumber()); + } + } + + //test default offset and len + private void _testDefaultOffsetAndLen() throws CommandException { + List<CoordinatorWfActionBean> coordWfActions = new CoordWfActionInfoXCommand(coordJob.getId(), "aa").call(); + assertEquals(5, coordWfActions.size()); + + for(int i = 0; i < coordWfActions.size(); i++) { + assertEquals(i + 1, coordWfActions.get(i).getActionNumber()); + } + } +}