This is an automated email from the ASF dual-hosted git repository. dionusos pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/oozie.git
The following commit(s) were added to refs/heads/master by this push: new 3c614c74c OOZIE-3717 When fork actions parallel submit, becasue ForkedActionStartXCommand and ActionStartXCommand has the same name, so ForkedActionStartXCommand would be lost, and cause deadlock (chenhd via dionusos) 3c614c74c is described below commit 3c614c74cb5cb8897a2f95334b5e467227edf740 Author: Denes Bodo <dionu...@apache.org> AuthorDate: Mon Aug 7 16:19:24 2023 +0200 OOZIE-3717 When fork actions parallel submit, becasue ForkedActionStartXCommand and ActionStartXCommand has the same name, so ForkedActionStartXCommand would be lost, and cause deadlock (chenhd via dionusos) --- .../oozie/command/wf/ActionStartXCommand.java | 14 +- .../command/wf/ForkedActionStartXCommand.java | 7 +- .../oozie/command/wf/TestSignalXCommand.java | 160 ++++++++++++++++++++- release-log.txt | 1 + 4 files changed, 175 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java index 387cbbedb..d59d78f59 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java @@ -56,7 +56,6 @@ import org.apache.oozie.service.EventHandlerService; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; import org.apache.oozie.service.UUIDService; -import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.ELEvaluationException; import org.apache.oozie.util.Instrumentation; import org.apache.oozie.util.JobUtils; @@ -96,6 +95,19 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command this.jobId = wfJob.getId(); } + public ActionStartXCommand(String actionId, String type, String name) { + super(name, type, 0); + this.actionId = actionId; + this.jobId = Services.get().get(UUIDService.class).getId(actionId); + } + + public ActionStartXCommand(WorkflowJobBean job, String actionId, String type, String name) { + super(name, type, 0); + this.actionId = actionId; + this.wfJob = job; + this.jobId = wfJob.getId(); + } + @Override protected void setLogInfo() { LogUtils.setLogInfo(actionId); diff --git a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java index 91da0b8f4..f5e27e96b 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java @@ -25,16 +25,17 @@ import org.apache.oozie.client.Job; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.command.CommandException; import org.apache.oozie.command.XCommand; -import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; public class ForkedActionStartXCommand extends ActionStartXCommand { + private final static String FORKED_ACTION_START_NAME = "action.forkedstart"; + public ForkedActionStartXCommand(String actionId, String type) { - super(actionId, type); + super(actionId, type, FORKED_ACTION_START_NAME); } public ForkedActionStartXCommand(WorkflowJobBean wfJob, String id, String type) { - super(wfJob, id, type); + super(wfJob, id, type, FORKED_ACTION_START_NAME); } protected ActionExecutorContext execute() throws CommandException { diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java index 9ffe2d543..d640150e0 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java @@ -45,10 +45,8 @@ import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; -import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; -import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor; import org.apache.oozie.local.LocalOozie; import org.apache.oozie.service.CallableQueueService; import org.apache.oozie.service.ConfigurationService; @@ -62,8 +60,15 @@ import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.workflow.lite.LiteWorkflowAppParser; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.oozie.util.XLog; +import org.apache.oozie.util.XCallable; +import org.apache.oozie.service.RecoveryService; + -import javax.persistence.EntityManager; public class TestSignalXCommand extends XDataTestCase { @@ -554,4 +559,153 @@ public class TestSignalXCommand extends XDataTestCase { } } } + + /** + * for test {@link #testDeadlockForForkParallelSubmit()} + */ + public static class TestRecoverForkStartActionCallableQueueService extends CallableQueueService{ + private final XLog log = XLog.getLog(getClass()); + public static TestSignalXCommand testSignalXCommand; + + /** + * Overwrite for test the same action's ActionStartXCommand in queue and the ForkedActionStartXCommand wouldn't lose, + * if ActionStartXCommand and ForkedActionStartXCommand has the same name, ForkedActionStartXCommand couldn't enqueue + * after a ActionStartXCommand in queue waiting for running. + */ + public class CallableWrapper<E> extends CallableQueueService.CallableWrapper<E> { + + boolean forkedActionStartXCommandFirstEnter; + public CallableWrapper(XCallable<E> callable, long delay) { + super(callable,delay); + forkedActionStartXCommandFirstEnter = callable instanceof ForkedActionStartXCommand; + } + + + public void run() { + XCallable<?> callable = getElement(); + if (forkedActionStartXCommandFirstEnter && callable instanceof ForkedActionStartXCommand){ + // make sure there has a ActionStartXCommand in the queue wait to run, + // and then ForkedActionStartXCommand enqueue + testSignalXCommand.waitFor(15 * 1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return !filterDuplicates(); + } + },200); + + log.warn("max concurrency for callable [{0}] exceeded, enqueueing with [{1}]ms delay", callable + .getType(), CONCURRENCY_DELAY); + setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS); + + try { + Method queue = CallableQueueService.class.getDeclaredMethod("queue", + CallableQueueService.CallableWrapper.class, boolean.class); + queue.setAccessible(true); + queue.invoke(TestRecoverForkStartActionCallableQueueService.this,this,true); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException(e); + } + forkedActionStartXCommandFirstEnter = false; + }else { + super.run(); + } + } + } + + /** + * Replace CallableQueueService.CallableWrapper to TestRecoverForkStartActionCallableQueueService.CallableWrapper + * for text, in order to call TestRecoverForkStartActionCallableQueueService.CallableWrapper for wait + * ActionStartXCommand enqueue before. + * + */ + public <T> Future<T> submit(CallableQueueService.CallableWrapper<T> task) throws InterruptedException { + return super.submit(new TestRecoverForkStartActionCallableQueueService.CallableWrapper<T>(task.getElement(), + task.getInitialDelay())); + } + } + + + /** + * Test : fork parallel submit, the action has the same XCommand in queue, there will skip enqueue by + * {@link CallableQueueService.CallableWrapper#filterDuplicates()} so if the ActionStartXCommand and + * ForkedActionStartXCommand has the same name, it would be lost. + * + * Note : RecoveryService will check the pending action and try to start it. So if the action's ForkedActionStartXCommand + * wait for run, there may be a ActionStartXCommand add for the same action. + * + */ + public void testDeadlockForForkParallelSubmit() throws Exception { + setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, TestRecoverForkStartActionCallableQueueService.class.getName()); + TestRecoverForkStartActionCallableQueueService.testSignalXCommand = this; + + services = new Services(); + Configuration servicesConf = services.getConf(); + servicesConf.setInt(RecoveryService.CONF_WF_ACTIONS_OLDER_THAN, 0); + servicesConf.setInt(RecoveryService.CONF_SERVICE_INTERVAL, 10); + services.init(); + + ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true); + + Configuration conf = new XConfiguration(); + String workflowUri = getTestCaseFileUri("workflow.xml"); + //@formatter:off + String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"wf-fork\">" + + "<start to=\"fork1\"/>" + + "<fork name=\"fork1\">" + + "<path start=\"action1\"/>" + + "<path start=\"action2\"/>" + + "<path start=\"action3\"/>" + + "<path start=\"action4\"/>" + + "<path start=\"action5\"/>" + + "</fork>" + + "<action name=\"action1\">" + + "<fs></fs>" + + "<ok to=\"join1\"/>" + + "<error to=\"kill\"/>" + + "</action>" + + "<action name=\"action2\">" + + "<fs></fs><ok to=\"join1\"/>" + + "<error to=\"kill\"/>" + + "</action>" + + "<action name=\"action3\">" + + "<fs></fs><ok to=\"join1\"/>" + + "<error to=\"kill\"/>" + + "</action>" + + "<action name=\"action4\">" + + "<fs></fs><ok to=\"join1\"/>" + + "<error to=\"kill\"/>" + + "</action>" + + "<action name=\"action5\">" + + "<fs></fs><ok to=\"join1\"/>" + + "<error to=\"kill\"/>" + + "</action>" + + "<join name=\"join1\" to=\"end\"/>" + + "<kill name=\"kill\"><message>killed</message>" + + "</kill><" + + "end name=\"end\"/>" + + "</workflow-app>"; + //@Formatter:on + + writeToFile(appXml, workflowUri); + conf.set(OozieClient.APP_PATH, workflowUri); + conf.set(OozieClient.USER_NAME, getTestUser()); + + SubmitXCommand sc = new SubmitXCommand(conf); + final String jobId = sc.call(); + new StartXCommand(jobId).call(); + + waitFor(30 * 1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return WorkflowJobQueryExecutor.getInstance() + .get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId) + .getStatus() == WorkflowJob.Status.SUCCEEDED; + } + }); + + assertEquals(WorkflowJobQueryExecutor.getInstance() + .get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId) + .getStatus(), + WorkflowJob.Status.SUCCEEDED); + } } diff --git a/release-log.txt b/release-log.txt index 5e3b886dd..7841defda 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.3.0 release (trunk - unreleased) +OOZIE-3717 When fork actions parallel submit, becasue ForkedActionStartXCommand and ActionStartXCommand has the same name, so ForkedActionStartXCommand would be lost, and cause deadlock (chenhd via dionusos) OOZIE-3715 Fix fork out more than one transitions submit , one transition submit fail can't execute KillXCommand (chenhd via dionusos) OOZIE-3716 Invocation of Main class completed Message is skipped when LauncherSecurityManager calls system exit (khr9603 via dionusos) OOZIE-3695 [sharelib-hive2] Fix current SpotBugs discovered issues in Oozie's sharelib-hive2 module (jmakai via dionusos)