This is an automated email from the ASF dual-hosted git repository.

dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new 13837d9b1 OOZIE-3670 Actions can stuck while running in a Fork-Join 
workflow (jmakai via dionusos)
13837d9b1 is described below

commit 13837d9b16c793eea2d9ce40052d956417102450
Author: Denes Bodo <dionu...@apache.org>
AuthorDate: Wed Nov 30 09:10:07 2022 +0100

    OOZIE-3670 Actions can stuck while running in a Fork-Join workflow (jmakai 
via dionusos)
---
 .../java/org/apache/oozie/WorkflowActionBean.java  |  7 +-
 .../java/org/apache/oozie/command/XCommand.java    | 34 ++++++++-
 .../oozie/command/wf/ActionCheckXCommand.java      | 18 ++++-
 .../apache/oozie/command/wf/ActionEndXCommand.java | 33 ++++++---
 .../apache/oozie/command/wf/SignalXCommand.java    | 15 ++++
 ...utsideOfProvidedActionGetForJobJPAExecutor.java | 65 +++++++++++++++++
 .../oozie/command/wf/TestActionCheckXCommand.java  | 82 ++++++++++------------
 .../oozie/command/wf/TestActionEndXCommand.java    | 67 ++++++++++++++++++
 .../java/org/apache/oozie/test/XDataTestCase.java  | 76 ++++++++++++++++++++
 release-log.txt                                    |  1 +
 10 files changed, 340 insertions(+), 58 deletions(-)

diff --git a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java 
b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
index 2d75e1670..f47969c9b 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
@@ -155,7 +155,12 @@ import org.json.simple.JSONObject;
             + "= 'END_MANUAL')"),
 
     @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW_RERUN", query = "select a.id, 
a.name, a.statusStr, a.endTimestamp, a.type "
-            + "from WorkflowActionBean a where a.wfId = :wfId order by 
a.startTimestamp") })
+            + "from WorkflowActionBean a where a.wfId = :wfId order by 
a.startTimestamp"),
+
+    @NamedQuery(name = "GET_ACTIONS_FAILED_OUTSIDE_OF_PROVIDED_ACTION", query 
= "select OBJECT(a) from "
+            + "WorkflowActionBean a where a.wfId = :wfId AND a.id <> :actionId 
AND a.statusStr = 'FAILED' order by "
+            + "a.startTimestamp")})
+
 @Table(name = "WF_ACTIONS")
 public class WorkflowActionBean implements Writable, WorkflowAction, JsonBean {
     @Id
diff --git a/core/src/main/java/org/apache/oozie/command/XCommand.java 
b/core/src/main/java/org/apache/oozie/command/XCommand.java
index 28918b6f7..1919f95d4 100644
--- a/core/src/main/java/org/apache/oozie/command/XCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/XCommand.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,11 +20,16 @@ package org.apache.oozie.command;
 
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.FaultInjection;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import 
org.apache.oozie.executor.jpa.WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor;
 import org.apache.oozie.service.CallableQueueService;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.InstrumentationService;
+import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.MemoryLocksService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.Instrumentation;
@@ -547,4 +552,31 @@ public abstract class XCommand<T> implements XCallable<T> {
     public String toString() {
         return getKey();
     }
+
+    /**
+     * Checks whether the given workflow job contains at least one failed 
action except for the action which this
+     * function was called with
+     *
+     * @param wfJob the workflow job
+     * @param wfAction the workflow action
+     * @return true if there is a failed action outside of the action which 
this function was called with
+     * @throws CommandException in case a missing JPAService
+     */
+    public boolean isOtherActionFailedUnderJob(WorkflowJobBean wfJob, 
WorkflowActionBean wfAction) throws CommandException {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        if (jpaService == null) {
+            throw new CommandException(ErrorCode.E0610);
+        }
+
+        List<WorkflowActionBean> actionList = null;
+        try {
+            // Getting the failed actions of the the given job outside of the 
given action
+            actionList = jpaService.execute(new 
WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor(
+                    wfJob.getId(), wfAction.getId()));
+        } catch (JPAExecutorException e) {
+            LOG.error("Could not get the actions of job [{0}]", wfJob.getId(), 
e);
+        }
+
+        return actionList != null && actionList.size() > 0;
+    }
 }
diff --git 
a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
index 335527d65..0cdfa5aa0 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -159,6 +159,22 @@ public class ActionCheckXCommand extends 
ActionXCommand<Void> {
             catch (JPAExecutorException e) {
                 throw new CommandException(e);
             }
+
+            // In case of forked actions there might be a case when an action 
- running in parallel - fails.
+            // In that case in the same fork, an other running action would 
not pass the precondition
+            // check, as the workflow job itself gets failed as well because 
of the other action's failure.
+            // This behaviour leads to the incidence that the action will 
stick in RUNNING phase.
+            // Hence the below method is responsible for recognizing those 
scenarios.
+
+            // If there is an (other) action which's status is FAILED in the 
same workflow job of this action
+            // to be checked, then it means this action was launched in 
parallel (with that other action),
+            // because otherwise the workflow job would not have transitioned 
to this action due to the
+            // other workflow's failure.
+            if (isOtherActionFailedUnderJob(wfJob, wfAction)) {
+                // Skipping throwing exception, therefore preventing this 
action to be stuck in RUNNING phase
+                return;
+            }
+
             throw new PreconditionException(ErrorCode.E0818, wfAction.getId(), 
wfJob.getId(), wfJob.getStatus());
         }
     }
diff --git 
a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
index 8bf4fc75e..08f4f8f4a 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
@@ -128,23 +128,38 @@ public class ActionEndXCommand extends 
ActionXCommand<Void> {
         if (wfAction == null) {
             throw new PreconditionException(ErrorCode.E0605, actionId);
         }
-        if (wfAction.isPending()
-                && (wfAction.getStatus() == WorkflowActionBean.Status.DONE
-                        || wfAction.getStatus() == 
WorkflowActionBean.Status.END_RETRY || wfAction.getStatus()
-                        == WorkflowActionBean.Status.END_MANUAL)) {
+
+        executor = 
Services.get().get(ActionService.class).getExecutor(wfAction.getType());
+        if (executor == null) {
+            throw new CommandException(ErrorCode.E0802, wfAction.getType());
+        }
+
+        if (wfAction.isPending() && (wfAction.getStatus() == 
WorkflowActionBean.Status.DONE
+                || wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY
+                || wfAction.getStatus() == 
WorkflowActionBean.Status.END_MANUAL)) {
 
             if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
+                // In case of forked actions there might be a case when an 
action - running in parallel - fails.
+                // In that case in the same fork, an other running action 
would not pass the precondition
+                // check, as the workflow job itself gets failed as well 
because of the other action's failure.
+                // This behaviour leads to the incidence that the action will 
stick in RUNNING phase.
+                // Hence the below method is responsible for recognizing those 
scenarios.
+
+                // If there is an (other) action which's status is FAILED in 
the same workflow job of this action
+                // to be checked, then it means this action was launched in 
parallel (with that other action),
+                // because otherwise the workflow job would not have 
transitioned to this action due to the
+                // other workflow's failure.
+                if (isOtherActionFailedUnderJob(wfJob, wfAction)) {
+                    // Skipping throwing exception, therefore preventing this 
action to be stuck in RUNNING phase
+                    return;
+                }
+
                 throw new PreconditionException(ErrorCode.E0811,  
WorkflowJob.Status.RUNNING.toString());
             }
         }
         else {
             throw new PreconditionException(ErrorCode.E0812, 
wfAction.isPending(), wfAction.getStatusStr());
         }
-
-        executor = 
Services.get().get(ActionService.class).getExecutor(wfAction.getType());
-        if (executor == null) {
-            throw new CommandException(ErrorCode.E0802, wfAction.getType());
-        }
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index 4db55ccde..37f55f820 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -155,6 +155,21 @@ public class SignalXCommand extends WorkflowXCommand<Void> 
{
     protected void verifyPrecondition() throws CommandException, 
PreconditionException {
         if ((wfAction == null) || (wfAction.isComplete() && 
wfAction.isPending())) {
             if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && 
wfJob.getStatus() != WorkflowJob.Status.PREP) {
+                // In case of forked actions there might be a case when an 
action - running in parallel - fails.
+                // In that case in the same fork, an other running action 
would not pass the precondition
+                // check, as the workflow job itself gets failed as well 
because of the other action's failure.
+                // This behaviour leads to the incidence that the action will 
stick in RUNNING phase.
+                // Hence the below method is responsible for recognizing those 
scenarios.
+
+                // If there is an (other) action which's status is FAILED n in 
the same workflow job of this action
+                // to be checked, then it means this action was launched in 
parallel (with that other action),
+                // because otherwise the workflow job would not have 
transitioned to this action due to the
+                // other workflow's failure.
+                if (isOtherActionFailedUnderJob(wfJob, wfAction)) {
+                    // Skipping throwing exception, therefore preventing this 
action to be stuck in RUNNING phase
+                    return;
+                }
+
                 throw new PreconditionException(ErrorCode.E0813, 
wfJob.getStatusStr());
             }
         }
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor.java
new file mode 100644
index 000000000..eb7d98a75
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.executor.jpa;
+
+import java.util.List;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Load the list of WorkflowAction for a WorkflowJob and return the list.
+ */
+public class WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor 
implements JPAExecutor<List<WorkflowActionBean>> {
+
+    private String wfJobId = null;
+    private String wfActionId = null;
+
+    public 
WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor(String 
wfJobId, String wfActionId) {
+        ParamChecker.notEmpty(wfJobId, "wfJobId");
+        ParamChecker.notEmpty(wfActionId, "wfActionId");
+        this.wfJobId = wfJobId;
+        this.wfActionId = wfActionId;
+    }
+
+    @Override
+    public String getName() {
+        return 
"WorkflowActionsFailedOutsideOfProvidedActionGetForJobJPAExecutor";
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public List<WorkflowActionBean> execute(EntityManager em) throws 
JPAExecutorException {
+        List<WorkflowActionBean> actions;
+        try {
+            Query q = 
em.createNamedQuery("GET_ACTIONS_FAILED_OUTSIDE_OF_PROVIDED_ACTION");
+            q.setParameter("wfId", wfJobId);
+            q.setParameter("actionId", wfActionId);
+            actions = q.getResultList();
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+        return actions;
+    }
+}
diff --git 
a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java 
b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
index 7fca19401..5c612ac70 100644
--- 
a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -34,6 +34,7 @@ import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.action.control.ForkActionExecutor;
 import org.apache.oozie.action.hadoop.LauncherHelper;
 import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
 import org.apache.oozie.action.hadoop.MapperReducerForTest;
@@ -254,6 +255,39 @@ public class TestActionCheckXCommand extends XDataTestCase 
{
         assertEquals(3L, counterVal);
     }
 
+    /**
+     * Test : verify that ActionCheckXCommand failed the precondition 
verification phase when job != RUNNING and
+     * there is an other action which status' is FAILED.
+     *
+     * @throws Exception
+     */
+    public void testParalellyFailedActionInJobContainingFork() throws 
Exception {
+        Instrumentation inst = 
Services.get().get(InstrumentationService.class).get();
+
+        WorkflowJobBean job = 
this.addRecordToWfJobTable(WorkflowJob.Status.FAILED, 
WorkflowInstance.Status.FAILED);
+
+        WorkflowActionBean forkAction = 
this.addRecordToWfActionTableWithType(job.getId(), "forkAction",
+                WorkflowAction.Status.OK, ForkActionExecutor.TYPE);
+
+        WorkflowActionBean failedAction = 
this.addRecordToWfActionTable(job.getId(), "failedAction",
+                WorkflowAction.Status.FAILED);
+
+        WorkflowActionBean greenAction = 
this.addRecordToWfActionTable(job.getId(), "greenAction",
+                WorkflowAction.Status.RUNNING);
+
+        ActionCheckXCommand checkCmd = new 
ActionCheckXCommand(greenAction.getId());
+
+        checkCmd.call();
+
+        try {
+            // this is supposed to throw NullPointerException
+            
inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(checkCmd.getName() + 
".preconditionfailed").getValue();
+            fail("A NullPointerException should have been thrown");
+        } catch (NullPointerException expect) {
+            // we should get here
+        }
+    }
+
     public void testActionCheck() throws Exception {
         JPAService jpaService = Services.get().get(JPAService.class);
         WorkflowJobBean job = 
this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, 
WorkflowInstance.Status.RUNNING);
@@ -608,7 +642,7 @@ public class TestActionCheckXCommand extends XDataTestCase {
     @Override
     protected WorkflowActionBean addRecordToWfActionTable(
             String wfId, String actionName, WorkflowAction.Status status) 
throws Exception {
-        WorkflowActionBean action = createWorkflowActionSetPending(wfId, 
status);
+        WorkflowActionBean action = createWorkflowActionSetPending(wfId, 
actionName, status);
         try {
             JPAService jpaService = Services.get().get(JPAService.class);
             assertNotNull(jpaService);
@@ -622,48 +656,4 @@ public class TestActionCheckXCommand extends XDataTestCase 
{
         }
         return action;
     }
-
-    protected WorkflowActionBean createWorkflowActionSetPending(String wfId, 
WorkflowAction.Status status) throws Exception {
-        WorkflowActionBean action = new WorkflowActionBean();
-        String actionname = "testAction";
-        action.setName(actionname);
-        
action.setId(Services.get().get(UUIDService.class).generateChildId(wfId, 
actionname));
-        action.setJobId(wfId);
-        action.setType("map-reduce");
-        action.setTransition("transition");
-        action.setStatus(status);
-        action.setStartTime(new Date());
-        action.setEndTime(new Date());
-        action.setLastCheckTime(new Date());
-        action.setPending();
-        action.setExecutionPath("/");
-        action.setUserRetryMax(2);
-
-        Path inputDir = new Path(getFsTestCaseDir(), "input");
-        Path outputDir = new Path(getFsTestCaseDir(), "output");
-
-        FileSystem fs = getFileSystem();
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, 
"data.txt")), StandardCharsets.UTF_8);
-        w.write("dummy\n");
-        w.write("dummy\n");
-        w.close();
-
-        String actionXml = "<map-reduce>" +
-        "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
-        "<name-node>" + getNameNodeUri() + "</name-node>" +
-        "<prepare><delete path=\"" + outputDir.toString() + "\"/></prepare>" +
-        "<configuration>" +
-        "<property><name>mapred.mapper.class</name><value>" + 
MapperReducerForTest.class.getName() +
-        "</value></property>" +
-        "<property><name>mapred.reducer.class</name><value>" + 
MapperReducerForTest.class.getName() +
-        "</value></property>" +
-        
"<property><name>mapred.input.dir</name><value>"+inputDir.toString()+"</value></property>"
 +
-        
"<property><name>mapred.output.dir</name><value>"+outputDir.toString()+"</value></property>"
 +
-        "</configuration>" +
-        "</map-reduce>";
-        action.setConf(actionXml);
-
-        return action;
-    }
-
 }
diff --git 
a/core/src/test/java/org/apache/oozie/command/wf/TestActionEndXCommand.java 
b/core/src/test/java/org/apache/oozie/command/wf/TestActionEndXCommand.java
new file mode 100644
index 000000000..0b212c17a
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionEndXCommand.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.wf;
+
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.control.ForkActionExecutor;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.XCommand;
+import org.apache.oozie.service.InstrumentationService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.Instrumentation;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestActionEndXCommand extends XDataTestCase {
+
+    /**
+     * Test : verify that ActionEndXCommand failed the precondition 
verification phase when job != RUNNING and
+     * there is an other action which status' is FAILED.
+     *
+     * @throws Exception
+     */
+    public void testParalellyFailedActionInJobContainingFork() throws 
Exception {
+        Instrumentation inst = 
Services.get().get(InstrumentationService.class).get();
+
+        WorkflowJobBean job = 
this.addRecordToWfJobTable(WorkflowJob.Status.FAILED, 
WorkflowInstance.Status.FAILED);
+
+        WorkflowActionBean forkAction = 
this.addRecordToWfActionTableWithType(job.getId(), "forkAction",
+                WorkflowAction.Status.OK, ForkActionExecutor.TYPE);
+
+        WorkflowActionBean failedAction = 
this.addRecordToWfActionTable(job.getId(), "failedAction",
+                WorkflowAction.Status.FAILED);
+
+        WorkflowActionBean greenAction = 
this.addRecordToWfActionTable(job.getId(), "greenAction",
+                WorkflowAction.Status.DONE, "/", true);
+
+        ActionEndXCommand checkCmd = new 
ActionEndXCommand(greenAction.getId(), greenAction.getType());
+
+        checkCmd.call();
+
+        try {
+            // this is supposed to throw NullPointerException
+            
inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(checkCmd.getName() + 
".preconditionfailed").getValue();
+            fail("A NullPointerException should have been thrown");
+        } catch (NullPointerException expect) {
+            // we should get here
+        }
+    }
+}
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java 
b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index db22103eb..8e5d69aa0 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -1972,4 +1972,80 @@ public abstract class XDataTestCase extends 
XHCatTestCase {
         assertEquals(stat, action.getStatus());
         return action;
     }
+
+    protected WorkflowActionBean addRecordToWfActionTableWithType(String wfId, 
String actionName, WorkflowAction.Status status,
+                                                                  String type) 
throws Exception {
+        WorkflowActionBean action = createWorkflowActionSetPending(wfId, 
actionName, status);
+        action.setType(type);
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            assertNotNull(jpaService);
+            WorkflowActionInsertJPAExecutor actionInsertCmd = new 
WorkflowActionInsertJPAExecutor(action);
+            jpaService.execute(actionInsertCmd);
+        }
+        catch (JPAExecutorException ce) {
+            ce.printStackTrace();
+            fail("Unable to insert the test wf action record to table");
+            throw ce;
+        }
+        return action;
+    }
+
+    protected WorkflowActionBean createWorkflowActionSetPending(String wfId, 
WorkflowAction.Status status) throws Exception {
+        return createWorkflowActionSetPending(wfId, "actionName", status);
+    }
+
+    protected WorkflowActionBean createWorkflowActionSetPending(String wfId, 
String actionName, WorkflowAction.Status status)
+            throws Exception {
+        WorkflowActionBean action = new WorkflowActionBean();
+        action.setName(actionName);
+        
action.setId(Services.get().get(UUIDService.class).generateChildId(wfId, 
actionName));
+        action.setJobId(wfId);
+        action.setType("map-reduce");
+        action.setTransition("transition");
+        action.setStatus(status);
+        action.setStartTime(new Date());
+        action.setEndTime(new Date());
+        action.setLastCheckTime(new Date());
+        action.setPending();
+        action.setExecutionPath("/");
+        action.setUserRetryMax(2);
+
+        Path inputDir = new Path(getFsTestCaseDir(), "input");
+        Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+        FileSystem fs = getFileSystem();
+        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, 
"data.txt")), StandardCharsets.UTF_8);
+        w.write("dummy\n");
+        w.write("dummy\n");
+        w.close();
+
+        String actionXml =
+                        " <map-reduce>"
+                        + "  <job-tracker>" + getJobTrackerUri() + 
"</job-tracker>"
+                        + "  <name-node>" + getNameNodeUri() + "</name-node>"
+                        + "  <prepare><delete path=\"" + outputDir.toString() 
+ "\"/></prepare>"
+                        + "  <configuration>"
+                        + "    <property>"
+                        + "      <name>mapred.mapper.class</name>"
+                        + "      <value>" + 
MapperReducerForTest.class.getName() + "</value>"
+                        + "    </property>"
+                        + "    <property>"
+                        + "      <name>mapred.reducer.class</name>"
+                        + "      <value>" + 
MapperReducerForTest.class.getName() + "</value>"
+                        + "    </property>"
+                        + "    <property>"
+                        + "      <name>mapred.input.dir</name>"
+                        + "      <value>" + inputDir.toString() + "</value>"
+                        + "    </property>"
+                        + "    <property>"
+                        + "      <name>mapred.output.dir</name>"
+                        + "      <value>" + outputDir.toString() + "</value>"
+                        + "    </property>"
+                        + "  </configuration>"
+                        + "</map-reduce>";
+        action.setConf(actionXml);
+
+        return action;
+    }
 }
diff --git a/release-log.txt b/release-log.txt
index ffee90849..4cfb75bd5 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.3.0 release (trunk - unreleased)
 
+OOZIE-3670 Actions can stuck while running in a Fork-Join workflow (jmakai via 
dionusos)
 OOZIE-3676 Remove all non FIPS compliant encoding algorithms (jmakai via 
dionusos)
 OOZIE-3674 Add a --insecure like parameter to Oozie client so it can ignore 
certificate errors (jmakai via dionusos)
 OOZIE-3673 Add possibility to configure custom SSL/TLS protocols when 
executing an email action (jmakai via dionusos)

Reply via email to