[
https://issues.apache.org/jira/browse/OOZIE-3717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
chenhaodan updated OOZIE-3717:
------------------------------
Description:
Fork actions parallel submit, so will add ForkedActionStartXCommand and
RecoveryService will check pending action may add ActionStartXCommand, if
ForkedActionStartXCommand enqueue and there is a ActionStartXCommand(the same
action) in queue, it would be lose. The thread parallel submit actions block at
CallableQueueService.blockingWait() wait for ForkedActionStartXCommand to
finish, but ForkedActionStartXCommand had lost and cause deadlock.
{code:java}
Thread 1 Thread 2
(ForkedActionStartXCommand) (ActionStartXCommand)
+----------------------------+ +---------+
| removeFromUniqueCallables | | ..... |
+----------------------------+ +---------+
| ...... | | queue |
+----------------------------+ +---------+
| queue | enqueue successed, in uniqueCallables
+----------------------------+
| wrapper.filterDuplicates() |
+----------------------------+
Thread 1 and Thread 2 execute CallableWrapper's execute function order :
1. Thread 1 execute removeFromUniqueCallables;
2. Thread 2 execute queue add ActionStartXCommand into queue and add to
uniqueCallables;
3. Thread 1 execute queue add ForkedActionStartXCommand into queue, but
filterDuplicates() function found a same name XCommand in uniqueCallables, so
skip add to queue;
Becasue ForkedActionStartXCommand and ActionStartXCommand has the same name,
Thread 2 add ActionStartXCommand enqueue before Thread 1, so
ForkedActionStartXCommand would be lost(never execute), and the thread that
fork actions parallel submit block at CallableQueueService.blockingWait().
{code}
*CallableWrapper's code*
{code:java}
public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E>
implements Runnable, Callable<E> {
private Instrumentation.Cron cron;
public void run() {
XCallable<?> callable = null;
try {
removeFromUniqueCallables();
if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with
[{1}]ms delay", getElement().getType(),
SAFE_MODE_DELAY);
setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
queue(this, true);
return;
}
callable = getElement();
if (callableBegin(callable)) {
cron.stop();
addInQueueCron(cron);
XLog log = XLog.getLog(getClass());
log.trace("executing callable [{0}]", callable.getName());
try {
//FutureTask.run() will invoke cllable.call()
super.run();
incrCounter(INSTR_EXECUTED_COUNTER, 1);
log.trace("executed callable [{0}]", callable.getName());
}
catch (Exception ex) {
incrCounter(INSTR_FAILED_COUNTER, 1);
log.warn("exception callable [{0}], {1}",
callable.getName(), ex.getMessage(), ex);
}
}
else {
log.warn("max concurrency for callable [{0}] exceeded,
requeueing with [{1}]ms delay", callable
.getType(), CONCURRENCY_DELAY);
setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
queue(this, true);
incrCounter(callable.getType() + "#exceeded.concurrency", 1);
}
}
catch (Throwable t) {
incrCounter(INSTR_FAILED_COUNTER, 1);
log.warn("exception callable [{0}], {1}", callable == null ? "N/A"
: callable.getName(),
t.getMessage(), t);
}
finally {
if (callable != null) {
callableEnd(callable);
}
}
}
}
{code}
was:
Fork actions parallel submit, so will add ForkedActionStartXCommand and
RecoveryService will check pending action may add ActionStartXCommand, if
ForkedActionStartXCommand enqueue and there is a ActionStartXCommand(the same
action) in queue, it would be lose. The thread parallel submit actions block at
CallableQueueService.blockingWait() wait for ForkedActionStartXCommand to
finish, but ForkedActionStartXCommand had lost and cause deadlock.
{code:java}
Thread 1 Thread 2
(ForkedActionStartXCommand) (ActionStartXCommand)
+----------------------------+ +---------+
| removeFromUniqueCallables | | ..... |
+----------------------------+ +---------+
| ...... | | queue |
+----------------------------+ +---------+
| queue | enqueue successed, in uniqueCallables
+----------------------------+
| wrapper.filterDuplicates() |
+----------------------------+
Becasue ForkedActionStartXCommand and ActionStartXCommand has the same name,
Thread 2 add ActionStartXCommand enqueue before Thread 1, so
ForkedActionStartXCommand would be lost, and the thread that fork actions
parallel submit block at CallableQueueService.blockingWait(). {code}
> When fork actions parallel submit, becasue ForkedActionStartXCommand and
> ActionStartXCommand has the same name, so ForkedActionStartXCommand would be
> lost, and cause deadlock
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: OOZIE-3717
> URL: https://issues.apache.org/jira/browse/OOZIE-3717
> Project: Oozie
> Issue Type: Bug
> Components: action
> Affects Versions: 5.2.1
> Reporter: chenhaodan
> Assignee: chenhaodan
> Priority: Major
> Fix For: trunk
>
> Attachments: OOZIE-3717-001.patch
>
>
> Fork actions parallel submit, so will add ForkedActionStartXCommand and
> RecoveryService will check pending action may add ActionStartXCommand, if
> ForkedActionStartXCommand enqueue and there is a ActionStartXCommand(the same
> action) in queue, it would be lose. The thread parallel submit actions block
> at CallableQueueService.blockingWait() wait for ForkedActionStartXCommand to
> finish, but ForkedActionStartXCommand had lost and cause deadlock.
> {code:java}
> Thread 1 Thread 2
> (ForkedActionStartXCommand) (ActionStartXCommand)
> +----------------------------+ +---------+
> | removeFromUniqueCallables | | ..... |
> +----------------------------+ +---------+
> | ...... | | queue |
> +----------------------------+ +---------+
> | queue | enqueue successed, in uniqueCallables
> +----------------------------+
> | wrapper.filterDuplicates() |
> +----------------------------+
> Thread 1 and Thread 2 execute CallableWrapper's execute function order :
> 1. Thread 1 execute removeFromUniqueCallables;
> 2. Thread 2 execute queue add ActionStartXCommand into queue and add to
> uniqueCallables;
> 3. Thread 1 execute queue add ForkedActionStartXCommand into queue, but
> filterDuplicates() function found a same name XCommand in uniqueCallables, so
> skip add to queue;
> Becasue ForkedActionStartXCommand and ActionStartXCommand has the same name,
> Thread 2 add ActionStartXCommand enqueue before Thread 1, so
> ForkedActionStartXCommand would be lost(never execute), and the thread that
> fork actions parallel submit block at CallableQueueService.blockingWait().
> {code}
>
> *CallableWrapper's code*
> {code:java}
> public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E>
> implements Runnable, Callable<E> {
> private Instrumentation.Cron cron;
> public void run() {
> XCallable<?> callable = null;
> try {
> removeFromUniqueCallables();
> if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
> log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with
> [{1}]ms delay", getElement().getType(),
> SAFE_MODE_DELAY);
> setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
> queue(this, true);
> return;
> }
> callable = getElement();
> if (callableBegin(callable)) {
> cron.stop();
> addInQueueCron(cron);
> XLog log = XLog.getLog(getClass());
> log.trace("executing callable [{0}]", callable.getName());
> try {
> //FutureTask.run() will invoke cllable.call()
> super.run();
> incrCounter(INSTR_EXECUTED_COUNTER, 1);
> log.trace("executed callable [{0}]", callable.getName());
> }
> catch (Exception ex) {
> incrCounter(INSTR_FAILED_COUNTER, 1);
> log.warn("exception callable [{0}], {1}",
> callable.getName(), ex.getMessage(), ex);
> }
> }
> else {
> log.warn("max concurrency for callable [{0}] exceeded,
> requeueing with [{1}]ms delay", callable
> .getType(), CONCURRENCY_DELAY);
> setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
> queue(this, true);
> incrCounter(callable.getType() + "#exceeded.concurrency", 1);
> }
> }
> catch (Throwable t) {
> incrCounter(INSTR_FAILED_COUNTER, 1);
> log.warn("exception callable [{0}], {1}", callable == null ?
> "N/A" : callable.getName(),
> t.getMessage(), t);
> }
> finally {
> if (callable != null) {
> callableEnd(callable);
> }
> }
> }
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)