[ 
https://issues.apache.org/jira/browse/OOZIE-3715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenhaodan updated OOZIE-3715:
------------------------------
    Description: 

When I fork 2 transitions( A and B) to submit , when A transition failed , B 
transition  still Running , because can't execute KillXCommand.

SignalXCommand.startForkedActions, when one transition  submit fail will create 
a new ActionStartXCommand and invoke failJob, failJob will add 
WorkflowNotificationXCommand and KillXCommand to 
{color:#ff0000}*commandQueue*{color} , and callback at XCommand.call method , 
but we add WorkflowNotificationXCommand and KillXCommand to 
ActionStartXCommand‘s {color:#ff0000}*commandQueue*{color}  , but not 
SignalXCommand  ,  so can't execute KillXCommand. 

The code is as follows :

 
{code:java}
    public void startForkedActions(List<WorkflowActionBean> 
workflowActionBeanListForForked) throws CommandException {

        ......

            for (Future<ActionExecutorContext> result : futures) {
             ......
                if (context.getJobStatus() != null && 
context.getJobStatus().equals(Job.Status.FAILED)) {

                    new ActionStartXCommand(context.getAction().getId(), 
null).failJob(context);
             ......

        }
       ......
    }
{code}
 
{code:java}
public void failJob(ActionExecutor.Context context, WorkflowActionBean action) 
throws CommandException {
        WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
        if (!handleUserRetry(context, action)) {
            incrActionErrorCounter(action.getType(), "failed", 1);
            LOG.warn("Failing Job due to failed action [{0}]", 
action.getName());
            try {
                workflow.getWorkflowInstance().fail(action.getName());
                WorkflowInstance wfInstance = workflow.getWorkflowInstance();
                ((LiteWorkflowInstance) 
wfInstance).setStatus(WorkflowInstance.Status.FAILED);
                workflow.setWorkflowInstance(wfInstance);
                workflow.setStatus(WorkflowJob.Status.FAILED);
                action.setStatus(WorkflowAction.Status.FAILED);
                action.resetPending();
                queue(new WorkflowNotificationXCommand(workflow, action));
                queue(new KillXCommand(workflow.getId()));             
InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, 
getInstrumentation());
            }
            catch (WorkflowException ex) {
                throw new CommandException(ex);
            }
        }
    }

{code}
 
{code:java}
public final T call() throws CommandException {
    if (commandQueue != null) {
        for (Map.Entry<Long, List<XCommand<?>>> entry : 
commandQueue.entrySet()) {
            LOG.debug("Queuing [{0}] commands with delay [{1}]ms", 
entry.getValue().size(), entry.getKey());
            if (!callableQueueService.queueSerial(entry.getValue(), 
entry.getKey())) {
                LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, 
queue full", entry.getValue()
                    .size(), entry.getKey());
            }
        }
     } 
}
{code}
 

 

 

  was:
When I fork 2 transitions( A and B) to submit , when A fail , B still Running , 
because can't execute KillXCommand.
ActionXCommand execute failJob and add KillXCommand to commandQueue , but the 
commandQueue is the new Bean ActionXCommand not the SignalXCommand , so can't 
execute KillXCommand. The code is as follows :

 
{code:java}
new ActionStartXCommand(context.getAction().getId(), null).failJob(context)

public void failJob(ActionExecutor.Context context, WorkflowActionBean action) 
throws CommandException {
        WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
        if (!handleUserRetry(context, action)) {
            incrActionErrorCounter(action.getType(), "failed", 1);
            LOG.warn("Failing Job due to failed action [{0}]", 
action.getName());
            try {
                workflow.getWorkflowInstance().fail(action.getName());
                WorkflowInstance wfInstance = workflow.getWorkflowInstance();
                ((LiteWorkflowInstance) 
wfInstance).setStatus(WorkflowInstance.Status.FAILED);
                workflow.setWorkflowInstance(wfInstance);
                workflow.setStatus(WorkflowJob.Status.FAILED);
                action.setStatus(WorkflowAction.Status.FAILED);
                action.resetPending();                queue(new 
WorkflowNotificationXCommand(workflow, action));
                queue(new KillXCommand(workflow.getId()));                
InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, 
getInstrumentation());
            }
            catch (WorkflowException ex) {
                throw new CommandException(ex);
            }
        }
    }{code}
 

 


> Fix fork out more than one transitions submit , one transition submit fail 
> can't execute KillXCommand
> -----------------------------------------------------------------------------------------------------
>
>                 Key: OOZIE-3715
>                 URL: https://issues.apache.org/jira/browse/OOZIE-3715
>             Project: Oozie
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 5.2.1
>            Reporter: chenhaodan
>            Priority: Major
>              Labels: patch
>             Fix For: 5.2.1
>
>         Attachments: OOZIE-3715.patch
>
>
> When I fork 2 transitions( A and B) to submit , when A transition failed , B 
> transition  still Running , because can't execute KillXCommand.
> SignalXCommand.startForkedActions, when one transition  submit fail will 
> create a new ActionStartXCommand and invoke failJob, failJob will add 
> WorkflowNotificationXCommand and KillXCommand to 
> {color:#ff0000}*commandQueue*{color} , and callback at XCommand.call method , 
> but we add WorkflowNotificationXCommand and KillXCommand to 
> ActionStartXCommand‘s {color:#ff0000}*commandQueue*{color}  , but not 
> SignalXCommand  ,  so can't execute KillXCommand. 
> The code is as follows :
>  
> {code:java}
>     public void startForkedActions(List<WorkflowActionBean> 
> workflowActionBeanListForForked) throws CommandException {
>         ......
>             for (Future<ActionExecutorContext> result : futures) {
>              ......
>                 if (context.getJobStatus() != null && 
> context.getJobStatus().equals(Job.Status.FAILED)) {
>                     new ActionStartXCommand(context.getAction().getId(), 
> null).failJob(context);
>              ......
>         }
>        ......
>     }
> {code}
>  
> {code:java}
> public void failJob(ActionExecutor.Context context, WorkflowActionBean 
> action) throws CommandException {
>         WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
>         if (!handleUserRetry(context, action)) {
>             incrActionErrorCounter(action.getType(), "failed", 1);
>             LOG.warn("Failing Job due to failed action [{0}]", 
> action.getName());
>             try {
>                 workflow.getWorkflowInstance().fail(action.getName());
>                 WorkflowInstance wfInstance = workflow.getWorkflowInstance();
>                 ((LiteWorkflowInstance) 
> wfInstance).setStatus(WorkflowInstance.Status.FAILED);
>                 workflow.setWorkflowInstance(wfInstance);
>                 workflow.setStatus(WorkflowJob.Status.FAILED);
>                 action.setStatus(WorkflowAction.Status.FAILED);
>                 action.resetPending();
>                 queue(new WorkflowNotificationXCommand(workflow, action));
>                 queue(new KillXCommand(workflow.getId()));             
> InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, 
> getInstrumentation());
>             }
>             catch (WorkflowException ex) {
>                 throw new CommandException(ex);
>             }
>         }
>     }
> {code}
>  
> {code:java}
> public final T call() throws CommandException {
>     if (commandQueue != null) {
>         for (Map.Entry<Long, List<XCommand<?>>> entry : 
> commandQueue.entrySet()) {
>             LOG.debug("Queuing [{0}] commands with delay [{1}]ms", 
> entry.getValue().size(), entry.getKey());
>             if (!callableQueueService.queueSerial(entry.getValue(), 
> entry.getKey())) {
>                 LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, 
> queue full", entry.getValue()
>                     .size(), entry.getKey());
>             }
>         }
>      } 
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to