[
https://issues.apache.org/jira/browse/OOZIE-3715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17706778#comment-17706778
]
János Makai commented on OOZIE-3715:
------------------------------------
[~chenhd]
Thank you for the changes, the latest patch looks better and the unit test
seems to work as expected.
Two more minor things I have found that I would like to request:
* I think the newly created JavaDoc is only needed for
{*}org.apache.oozie.command.wf.TestSignalXCommand#testForkSubmitFail{*}.
I have rephrased it a little bit, if you do not mind:
{code:java}
/**
* This method tests the behavior of a scenario where a fork parallel submit is
initiated, and one of the
* transitions fail, resulting in the job failing, while the other transition's
state is about to be checked.
* The test verifies that a PreconditionException is thrown when action2 is in
the RUNNING or PREP state
*
* @throws Exception if an error occurs during the test.
*/
{code}
* Spacing is still a little bit off in your following comment:
{code:java}
// Fork out more than one transitions ,one submit fail can't execute
KillXCommand
{code}
----
Using on the current state I will execute some system tests to ensure the
change does not cause regression. I will get back to you as soon as I will have
results.
> Fix fork out more than one transitions submit , one transition submit fail
> can't execute KillXCommand
> -----------------------------------------------------------------------------------------------------
>
> Key: OOZIE-3715
> URL: https://issues.apache.org/jira/browse/OOZIE-3715
> Project: Oozie
> Issue Type: Bug
> Components: core
> Affects Versions: 5.2.1
> Reporter: chenhaodan
> Assignee: chenhaodan
> Priority: Major
> Labels: patch
> Fix For: trunk
>
> Attachments: OOZIE-3715-001.patch, OOZIE-3715-002.patch,
> OOZIE-3715-003.patch, OOZIE-3715-004.patch, forkSubmitFail_issue.txt
>
>
> When I fork 2 transitions( A and B) to submit , when A transition failed , B
> transition still Running , because can't execute KillXCommand.
> SignalXCommand.startForkedActions, when one transition submit fail will
> create a new ActionStartXCommand and invoke failJob, failJob will add
> WorkflowNotificationXCommand and KillXCommand to
> {color:#ff0000}*commandQueue*{color} , and callback at XCommand.call method ,
> but we add WorkflowNotificationXCommand and KillXCommand to
> ActionStartXCommand‘s {color:#ff0000}*commandQueue*{color} , but not
> SignalXCommand , so can't execute KillXCommand.
> The code is as follows :
>
> {code:java}
> public void startForkedActions(List<WorkflowActionBean>
> workflowActionBeanListForForked) throws CommandException {
> ......
> for (Future<ActionExecutorContext> result : futures) {
> ......
> if (context.getJobStatus() != null &&
> context.getJobStatus().equals(Job.Status.FAILED)) {
> new ActionStartXCommand(context.getAction().getId(),
> null).failJob(context);
> ......
> }
> ......
> }
> {code}
>
> {code:java}
> public void failJob(ActionExecutor.Context context, WorkflowActionBean
> action) throws CommandException {
> WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
> if (!handleUserRetry(context, action)) {
> incrActionErrorCounter(action.getType(), "failed", 1);
> LOG.warn("Failing Job due to failed action [{0}]",
> action.getName());
> try {
> workflow.getWorkflowInstance().fail(action.getName());
> WorkflowInstance wfInstance = workflow.getWorkflowInstance();
> ((LiteWorkflowInstance)
> wfInstance).setStatus(WorkflowInstance.Status.FAILED);
> workflow.setWorkflowInstance(wfInstance);
> workflow.setStatus(WorkflowJob.Status.FAILED);
> action.setStatus(WorkflowAction.Status.FAILED);
> action.resetPending();
> queue(new WorkflowNotificationXCommand(workflow, action));
> queue(new KillXCommand(workflow.getId()));
> InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1,
> getInstrumentation());
> }
> catch (WorkflowException ex) {
> throw new CommandException(ex);
> }
> }
> }
> {code}
>
> {code:java}
> public final T call() throws CommandException {
> if (commandQueue != null) {
> for (Map.Entry<Long, List<XCommand<?>>> entry :
> commandQueue.entrySet()) {
> LOG.debug("Queuing [{0}] commands with delay [{1}]ms",
> entry.getValue().size(), entry.getKey());
> if (!callableQueueService.queueSerial(entry.getValue(),
> entry.getKey())) {
> LOG.warn("Could not queue [{0}] commands with delay [{1}]ms,
> queue full", entry.getValue()
> .size(), entry.getKey());
> }
> }
> }
> }
> {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)