This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.9-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.9-prepare by this push:
     new 81a6057fd3 cherry-pick [Fix]Solve the deadlock problem caused by 
queuing #13191
81a6057fd3 is described below

commit 81a6057fd3e444d8f6bf959b7812463c265509a5
Author: sssqhai <[email protected]>
AuthorDate: Fri Dec 16 19:55:02 2022 +0800

    cherry-pick [Fix]Solve the deadlock problem caused by queuing #13191
---
 ...ntHandler.java => StateEventHandleFailure.java} | 25 ++++++++++------------
 .../server/master/event/StateEventHandler.java     |  5 +++--
 .../event/TaskWaitTaskGroupStateHandler.java       |  8 +++----
 .../master/runner/WorkflowExecuteRunnable.java     | 14 +++++++++---
 4 files changed, 28 insertions(+), 24 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleFailure.java
similarity index 54%
copy from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleFailure.java
index 00808b2e29..5e757c7858 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleFailure.java
@@ -17,20 +17,17 @@
 
 package org.apache.dolphinscheduler.server.master.event;
 
-import org.apache.dolphinscheduler.common.enums.StateEventType;
-import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-
-public interface StateEventHandler {
+/**
+ * This exception represent the exception can be recovered, when we get this 
exception,
+ * we will move the event to the fail of the queue.
+ */
+public class StateEventHandleFailure extends Exception {
 
-    /**
-     * Handle a event, if handle success will reture true, else return false
-     *
-     * @param stateEvent given state event.
-     * @throws StateEventHandleException this exception means it can be 
recovered.
-     * @throws StateEventHandleError     this exception means it cannot be 
recovered, so the event need to drop.
-     */
-    boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, 
StateEvent stateEvent)
-        throws StateEventHandleException, StateEventHandleError;
+    public StateEventHandleFailure(String message) {
+        super(message);
+    }
 
-    StateEventType getEventType();
+    public StateEventHandleFailure(String message, Throwable throwable) {
+        super(message, throwable);
+    }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java
index 00808b2e29..377ea71f62 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java
@@ -28,9 +28,10 @@ public interface StateEventHandler {
      * @param stateEvent given state event.
      * @throws StateEventHandleException this exception means it can be 
recovered.
      * @throws StateEventHandleError     this exception means it cannot be 
recovered, so the event need to drop.
+     * @throws StateEventHandleException this means it can be recovered.
      */
-    boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, 
StateEvent stateEvent)
-        throws StateEventHandleException, StateEventHandleError;
+    boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
+                             StateEvent stateEvent) throws 
StateEventHandleException, StateEventHandleError, StateEventHandleFailure;
 
     StateEventType getEventType();
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
index da8b564ed8..2ee40b155b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
@@ -31,12 +31,10 @@ public class TaskWaitTaskGroupStateHandler implements 
StateEventHandler {
 
     @Override
     public boolean handleStateEvent(WorkflowExecuteRunnable 
workflowExecuteRunnable,
-                                    StateEvent stateEvent) {
+                                    StateEvent stateEvent) throws 
StateEventHandleFailure {
         logger.info("Handle task instance wait task group event, 
taskInstanceId: {}", stateEvent.getTaskInstanceId());
-        if (workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent)) {
-            logger.info("Success wake up task instance, taskInstanceId: {}", 
stateEvent.getTaskInstanceId());
-        } else {
-            logger.info("Failed to wake up task instance, taskInstanceId: {}", 
stateEvent.getTaskInstanceId());
+        if (!workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent)) {
+            throw new StateEventHandleFailure("Task state event handle failed 
due to robing taskGroup resource failed");
         }
         return true;
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 50bfc2099c..64652015f0 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -70,6 +70,7 @@ import 
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutor
 import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.server.master.event.StateEventHandleError;
 import 
org.apache.dolphinscheduler.server.master.event.StateEventHandleException;
+import org.apache.dolphinscheduler.server.master.event.StateEventHandleFailure;
 import org.apache.dolphinscheduler.server.master.event.StateEventHandler;
 import 
org.apache.dolphinscheduler.server.master.event.StateEventHandlerManager;
 import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
@@ -292,19 +293,26 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatue> {
             } catch (StateEventHandleError stateEventHandleError) {
                 logger.error("State event handle error, will remove this 
event: {}", stateEvent, stateEventHandleError);
                 this.stateEvents.remove(stateEvent);
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (StateEventHandleException stateEventHandleException) {
                 logger.error("State event handle error, will retry this event: 
{}",
                         stateEvent,
                         stateEventHandleException);
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+            } catch (StateEventHandleFailure stateEventHandleFailure) {
+                logger.error("State event handle failed, will move event to 
the tail: {}",
+                        stateEvent,
+                        stateEventHandleFailure);
+                this.stateEvents.remove(stateEvent);
+                this.stateEvents.offer(stateEvent);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (Exception e) {
                 // we catch the exception here, since if the state event 
handle failed, the state event will still keep
                 // in the stateEvents queue.
                 logger.error("State event handle error, get a unknown 
exception, will retry this event: {}",
                         stateEvent,
                         e);
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } finally {
                 LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
             }

Reply via email to