[
https://issues.apache.org/jira/browse/OOZIE-3717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751270#comment-17751270
]
chenhaodan edited comment on OOZIE-3717 at 8/7/23 8:20 AM:
-----------------------------------------------------------
[~dionusos] I am sorry for that. I had fixed in [^OOZIE-3717-003.patch]
Thanks for your time.
was (Author: chenhd):
[~dionusos] I am sorry for that. I had fixed them in [^OOZIE-3717-003.patch]
Thanks for your time.
> When fork actions parallel submit, becasue ForkedActionStartXCommand and
> ActionStartXCommand has the same name, so ForkedActionStartXCommand would be
> lost, and cause deadlock
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: OOZIE-3717
> URL: https://issues.apache.org/jira/browse/OOZIE-3717
> Project: Oozie
> Issue Type: Bug
> Components: action
> Affects Versions: 5.2.1
> Reporter: chenhaodan
> Assignee: chenhaodan
> Priority: Major
> Attachments: OOZIE-3717-001.patch, OOZIE-3717-002.patch,
> OOZIE-3717-003.patch
>
>
> when fork actions parallel submit will add ForkedActionStartXCommand and
> RecoveryService will check pending action may add ActionStartXCommand, if
> ForkedActionStartXCommand enqueue and there is a ActionStartXCommand(the same
> action) in queue, it would be lose. The thread parallel submit actions block
> at CallableQueueService.blockingWait() wait for ForkedActionStartXCommand to
> finish, but ForkedActionStartXCommand had lost and cause deadlock.
> {code:java}
> Thread 1 Thread 2
> (ForkedActionStartXCommand) (ActionStartXCommand)
> +----------------------------+ +---------+
> | removeFromUniqueCallables | | ..... |
> +----------------------------+ +---------+
> | ...... | | queue |
> +----------------------------+ +---------+
> | queue | enqueue successed, in uniqueCallables
> +----------------------------+
> | wrapper.filterDuplicates() |
> +----------------------------+
> Thread 1 and Thread 2 execute CallableWrapper's execute function order like:
> 1. Thread 1 execute removeFromUniqueCallables;
> 2. Thread 2 execute queue add ActionStartXCommand into queue and add to
> uniqueCallables;
> 3. Thread 1 execute queue add ForkedActionStartXCommand into queue, but
> filterDuplicates() function found a same name XCommand in uniqueCallables, so
> skip add to queue;
> Becasue ForkedActionStartXCommand and ActionStartXCommand has the same name,
> Thread 2 add ActionStartXCommand enqueue before Thread 1, so
> ForkedActionStartXCommand would be lost(never execute), and the thread that
> fork actions parallel submit block at CallableQueueService.blockingWait().
> {code}
>
> *CallableWrapper's code*
> {code:java}
> public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E>
> implements Runnable, Callable<E> {
> private Instrumentation.Cron cron;
> public void run() {
> XCallable<?> callable = null;
> try {
> removeFromUniqueCallables();
> if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
> log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with
> [{1}]ms delay", getElement().getType(),
> SAFE_MODE_DELAY);
> setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
> queue(this, true);
> return;
> }
> callable = getElement();
> if (callableBegin(callable)) {
> cron.stop();
> addInQueueCron(cron);
> XLog log = XLog.getLog(getClass());
> log.trace("executing callable [{0}]", callable.getName());
> try {
> //FutureTask.run() will invoke cllable.call()
> super.run();
> incrCounter(INSTR_EXECUTED_COUNTER, 1);
> log.trace("executed callable [{0}]", callable.getName());
> }
> catch (Exception ex) {
> incrCounter(INSTR_FAILED_COUNTER, 1);
> log.warn("exception callable [{0}], {1}",
> callable.getName(), ex.getMessage(), ex);
> }
> }
> else {
> log.warn("max concurrency for callable [{0}] exceeded,
> requeueing with [{1}]ms delay", callable
> .getType(), CONCURRENCY_DELAY);
> setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
> queue(this, true);
> incrCounter(callable.getType() + "#exceeded.concurrency", 1);
> }
> }
> catch (Throwable t) {
> incrCounter(INSTR_FAILED_COUNTER, 1);
> log.warn("exception callable [{0}], {1}", callable == null ?
> "N/A" : callable.getName(),
> t.getMessage(), t);
> }
> finally {
> if (callable != null) {
> callableEnd(callable);
> }
> }
> }
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)