[ https://issues.apache.org/jira/browse/OOZIE-3717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
chenhaodan updated OOZIE-3717: ------------------------------ Attachment: OOZIE-3717-003.patch > When fork actions parallel submit, becasue ForkedActionStartXCommand and > ActionStartXCommand has the same name, so ForkedActionStartXCommand would be > lost, and cause deadlock > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ > > Key: OOZIE-3717 > URL: https://issues.apache.org/jira/browse/OOZIE-3717 > Project: Oozie > Issue Type: Bug > Components: action > Affects Versions: 5.2.1 > Reporter: chenhaodan > Assignee: chenhaodan > Priority: Major > Attachments: OOZIE-3717-001.patch, OOZIE-3717-002.patch, > OOZIE-3717-003.patch > > > when fork actions parallel submit will add ForkedActionStartXCommand and > RecoveryService will check pending action may add ActionStartXCommand, if > ForkedActionStartXCommand enqueue and there is a ActionStartXCommand(the same > action) in queue, it would be lose. The thread parallel submit actions block > at CallableQueueService.blockingWait() wait for ForkedActionStartXCommand to > finish, but ForkedActionStartXCommand had lost and cause deadlock. > {code:java} > Thread 1 Thread 2 > (ForkedActionStartXCommand) (ActionStartXCommand) > +----------------------------+ +---------+ > | removeFromUniqueCallables | | ..... | > +----------------------------+ +---------+ > | ...... | | queue | > +----------------------------+ +---------+ > | queue | enqueue successed, in uniqueCallables > +----------------------------+ > | wrapper.filterDuplicates() | > +----------------------------+ > Thread 1 and Thread 2 execute CallableWrapper's execute function order like: > 1. Thread 1 execute removeFromUniqueCallables; > 2. Thread 2 execute queue add ActionStartXCommand into queue and add to > uniqueCallables; > 3. Thread 1 execute queue add ForkedActionStartXCommand into queue, but > filterDuplicates() function found a same name XCommand in uniqueCallables, so > skip add to queue; > Becasue ForkedActionStartXCommand and ActionStartXCommand has the same name, > Thread 2 add ActionStartXCommand enqueue before Thread 1, so > ForkedActionStartXCommand would be lost(never execute), and the thread that > fork actions parallel submit block at CallableQueueService.blockingWait(). > {code} > > *CallableWrapper's code* > {code:java} > public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E> > implements Runnable, Callable<E> { > private Instrumentation.Cron cron; > public void run() { > XCallable<?> callable = null; > try { > removeFromUniqueCallables(); > if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { > log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with > [{1}]ms delay", getElement().getType(), > SAFE_MODE_DELAY); > setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS); > queue(this, true); > return; > } > callable = getElement(); > if (callableBegin(callable)) { > cron.stop(); > addInQueueCron(cron); > XLog log = XLog.getLog(getClass()); > log.trace("executing callable [{0}]", callable.getName()); > try { > //FutureTask.run() will invoke cllable.call() > super.run(); > incrCounter(INSTR_EXECUTED_COUNTER, 1); > log.trace("executed callable [{0}]", callable.getName()); > } > catch (Exception ex) { > incrCounter(INSTR_FAILED_COUNTER, 1); > log.warn("exception callable [{0}], {1}", > callable.getName(), ex.getMessage(), ex); > } > } > else { > log.warn("max concurrency for callable [{0}] exceeded, > requeueing with [{1}]ms delay", callable > .getType(), CONCURRENCY_DELAY); > setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS); > queue(this, true); > incrCounter(callable.getType() + "#exceeded.concurrency", 1); > } > } > catch (Throwable t) { > incrCounter(INSTR_FAILED_COUNTER, 1); > log.warn("exception callable [{0}], {1}", callable == null ? > "N/A" : callable.getName(), > t.getMessage(), t); > } > finally { > if (callable != null) { > callableEnd(callable); > } > } > } > } > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)