This is an automated email from the ASF dual-hosted git repository.

dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c614c74c OOZIE-3717 When fork actions parallel submit, becasue 
ForkedActionStartXCommand and ActionStartXCommand has the same name, so 
ForkedActionStartXCommand would be lost, and cause deadlock (chenhd via 
dionusos)
3c614c74c is described below

commit 3c614c74cb5cb8897a2f95334b5e467227edf740
Author: Denes Bodo <dionu...@apache.org>
AuthorDate: Mon Aug 7 16:19:24 2023 +0200

    OOZIE-3717 When fork actions parallel submit, becasue 
ForkedActionStartXCommand and ActionStartXCommand has the same name, so 
ForkedActionStartXCommand would be lost, and cause deadlock (chenhd via 
dionusos)
---
 .../oozie/command/wf/ActionStartXCommand.java      |  14 +-
 .../command/wf/ForkedActionStartXCommand.java      |   7 +-
 .../oozie/command/wf/TestSignalXCommand.java       | 160 ++++++++++++++++++++-
 release-log.txt                                    |   1 +
 4 files changed, 175 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
index 387cbbedb..d59d78f59 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -56,7 +56,6 @@ import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
-import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ELEvaluationException;
 import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.JobUtils;
@@ -96,6 +95,19 @@ public class ActionStartXCommand extends 
ActionXCommand<org.apache.oozie.command
         this.jobId = wfJob.getId();
     }
 
+    public ActionStartXCommand(String actionId, String type, String name) {
+        super(name, type, 0);
+        this.actionId = actionId;
+        this.jobId = Services.get().get(UUIDService.class).getId(actionId);
+    }
+
+    public ActionStartXCommand(WorkflowJobBean job, String actionId, String 
type, String name) {
+        super(name, type, 0);
+        this.actionId = actionId;
+        this.wfJob = job;
+        this.jobId = wfJob.getId();
+    }
+
     @Override
     protected void setLogInfo() {
         LogUtils.setLogInfo(actionId);
diff --git 
a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
index 91da0b8f4..f5e27e96b 100644
--- 
a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
@@ -25,16 +25,17 @@ import org.apache.oozie.client.Job;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.XCommand;
-import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
 
 public class ForkedActionStartXCommand extends ActionStartXCommand {
 
+    private final static String FORKED_ACTION_START_NAME = 
"action.forkedstart";
+
     public ForkedActionStartXCommand(String actionId, String type) {
-        super(actionId, type);
+        super(actionId, type, FORKED_ACTION_START_NAME);
     }
 
     public ForkedActionStartXCommand(WorkflowJobBean wfJob, String id, String 
type) {
-        super(wfJob, id, type);
+        super(wfJob, id, type, FORKED_ACTION_START_NAME);
     }
 
     protected ActionExecutorContext execute() throws CommandException {
diff --git 
a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java 
b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
index 9ffe2d543..d640150e0 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
@@ -45,10 +45,8 @@ import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
 import org.apache.oozie.local.LocalOozie;
 import org.apache.oozie.service.CallableQueueService;
 import org.apache.oozie.service.ConfigurationService;
@@ -62,8 +60,15 @@ import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.workflow.lite.LiteWorkflowAppParser;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XCallable;
+import org.apache.oozie.service.RecoveryService;
+
 
-import javax.persistence.EntityManager;
 
 public class TestSignalXCommand extends XDataTestCase {
 
@@ -554,4 +559,153 @@ public class TestSignalXCommand extends XDataTestCase {
             }
         }
     }
+
+    /**
+     * for test {@link #testDeadlockForForkParallelSubmit()}
+     */
+    public static class TestRecoverForkStartActionCallableQueueService extends 
CallableQueueService{
+        private final XLog log = XLog.getLog(getClass());
+        public static TestSignalXCommand testSignalXCommand;
+
+        /**
+         *  Overwrite for test the same action's ActionStartXCommand in queue 
and the ForkedActionStartXCommand wouldn't lose,
+         *  if ActionStartXCommand and ForkedActionStartXCommand has the same 
name, ForkedActionStartXCommand couldn't enqueue
+         *  after a ActionStartXCommand in queue waiting for running.
+         */
+        public class CallableWrapper<E> extends 
CallableQueueService.CallableWrapper<E> {
+
+            boolean forkedActionStartXCommandFirstEnter;
+            public CallableWrapper(XCallable<E> callable, long delay) {
+                super(callable,delay);
+                forkedActionStartXCommandFirstEnter = callable instanceof 
ForkedActionStartXCommand;
+            }
+
+
+            public void run() {
+                XCallable<?> callable = getElement();
+                if (forkedActionStartXCommandFirstEnter && callable instanceof 
ForkedActionStartXCommand){
+                    // make sure there has a ActionStartXCommand in the queue 
wait to run,
+                    // and then ForkedActionStartXCommand enqueue
+                    testSignalXCommand.waitFor(15 * 1000, new Predicate() {
+                        @Override
+                        public boolean evaluate() throws Exception {
+                            return !filterDuplicates();
+                        }
+                    },200);
+
+                    log.warn("max concurrency for callable [{0}] exceeded, 
enqueueing with [{1}]ms delay", callable
+                            .getType(), CONCURRENCY_DELAY);
+                    setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
+
+                    try {
+                        Method queue = 
CallableQueueService.class.getDeclaredMethod("queue",
+                                CallableQueueService.CallableWrapper.class, 
boolean.class);
+                        queue.setAccessible(true);
+                        
queue.invoke(TestRecoverForkStartActionCallableQueueService.this,this,true);
+                    } catch (NoSuchMethodException | InvocationTargetException 
| IllegalAccessException e) {
+                        throw new RuntimeException(e);
+                    }
+                    forkedActionStartXCommandFirstEnter = false;
+                }else {
+                    super.run();
+                }
+            }
+        }
+
+        /**
+         * Replace CallableQueueService.CallableWrapper to 
TestRecoverForkStartActionCallableQueueService.CallableWrapper
+         * for text, in order to call 
TestRecoverForkStartActionCallableQueueService.CallableWrapper for wait
+         * ActionStartXCommand enqueue before.
+         *
+         */
+        public <T> Future<T> submit(CallableQueueService.CallableWrapper<T> 
task) throws InterruptedException {
+            return super.submit(new 
TestRecoverForkStartActionCallableQueueService.CallableWrapper<T>(task.getElement(),
+                    task.getInitialDelay()));
+        }
+    }
+
+
+    /**
+     * Test : fork parallel submit, the action has the same XCommand in queue, 
there will skip enqueue by
+     * {@link CallableQueueService.CallableWrapper#filterDuplicates()} so if 
the ActionStartXCommand  and
+     * ForkedActionStartXCommand has the same name, it would be lost.
+     *
+     * Note : RecoveryService will check the pending action and try to start 
it. So if the action's ForkedActionStartXCommand
+     * wait for run, there may be a ActionStartXCommand add for the same 
action.
+     *
+     */
+    public void testDeadlockForForkParallelSubmit() throws Exception {
+        setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, 
TestRecoverForkStartActionCallableQueueService.class.getName());
+        TestRecoverForkStartActionCallableQueueService.testSignalXCommand = 
this;
+
+        services = new Services();
+        Configuration servicesConf = services.getConf();
+        servicesConf.setInt(RecoveryService.CONF_WF_ACTIONS_OLDER_THAN, 0);
+        servicesConf.setInt(RecoveryService.CONF_SERVICE_INTERVAL, 10);
+        services.init();
+
+        
ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, 
true);
+
+        Configuration conf = new XConfiguration();
+        String workflowUri = getTestCaseFileUri("workflow.xml");
+        //@formatter:off
+        String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" 
name=\"wf-fork\">"
+                + "<start to=\"fork1\"/>"
+                + "<fork name=\"fork1\">"
+                + "<path start=\"action1\"/>"
+                + "<path start=\"action2\"/>"
+                + "<path start=\"action3\"/>"
+                + "<path start=\"action4\"/>"
+                + "<path start=\"action5\"/>"
+                + "</fork>"
+                + "<action name=\"action1\">"
+                + "<fs></fs>"
+                + "<ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<action name=\"action2\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<action name=\"action3\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<action name=\"action4\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<action name=\"action5\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<join name=\"join1\" to=\"end\"/>"
+                + "<kill name=\"kill\"><message>killed</message>"
+                + "</kill><"
+                + "end name=\"end\"/>"
+                + "</workflow-app>";
+        //@Formatter:on
+
+        writeToFile(appXml, workflowUri);
+        conf.set(OozieClient.APP_PATH, workflowUri);
+        conf.set(OozieClient.USER_NAME, getTestUser());
+
+        SubmitXCommand sc = new SubmitXCommand(conf);
+        final String jobId = sc.call();
+        new StartXCommand(jobId).call();
+
+        waitFor(30 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return WorkflowJobQueryExecutor.getInstance()
+                        
.get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
+                        .getStatus() == WorkflowJob.Status.SUCCEEDED;
+            }
+        });
+
+        assertEquals(WorkflowJobQueryExecutor.getInstance()
+                        
.get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
+                        .getStatus(),
+                WorkflowJob.Status.SUCCEEDED);
+    }
 }
diff --git a/release-log.txt b/release-log.txt
index 5e3b886dd..7841defda 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.3.0 release (trunk - unreleased)
 
+OOZIE-3717 When fork actions parallel submit, becasue 
ForkedActionStartXCommand and ActionStartXCommand has the same name, so 
ForkedActionStartXCommand would be lost, and cause deadlock (chenhd via 
dionusos)
 OOZIE-3715 Fix fork out more than one transitions submit , one transition 
submit fail can't execute KillXCommand (chenhd via dionusos)
 OOZIE-3716 Invocation of Main class completed Message is skipped when 
LauncherSecurityManager calls system exit (khr9603 via dionusos)
 OOZIE-3695 [sharelib-hive2] Fix current SpotBugs discovered issues in Oozie's 
sharelib-hive2 module (jmakai via dionusos)

Reply via email to