This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 14272dafab Fix failover Master might not release taskGroup (#15287)
14272dafab is described below
commit 14272dafab40588c18eafb4e9aad70b8f79aaead
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Dec 6 13:31:46 2023 +0800
Fix failover Master might not release taskGroup (#15287)
---
.../server/master/runner/WorkflowExecuteRunnable.java | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 35ad1e8c82..9006812431 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -465,12 +465,13 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
* release task group
*
*/
- public void releaseTaskGroup(TaskInstance taskInstance) throws
InterruptedException {
+ public void releaseTaskGroup(TaskInstance taskInstance) {
ProcessInstance workflowInstance =
workflowExecuteContext.getWorkflowInstance();
// todo: use Integer
if (taskInstance.getTaskGroupId() <= 0) {
log.info("The current TaskInstance: {} doesn't use taskGroup, no
need to release taskGroup",
taskInstance.getName());
+ return;
}
TaskInstance nextTaskInstance =
processService.releaseTaskGroup(taskInstance);
if (nextTaskInstance == null) {
@@ -1347,9 +1348,11 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
TaskExecutionStatus state = existTaskInstance.getState();
if (state == TaskExecutionStatus.RUNNING_EXECUTION
|| state == TaskExecutionStatus.DISPATCH
- || state == TaskExecutionStatus.SUBMITTED_SUCCESS) {
+ || state == TaskExecutionStatus.SUBMITTED_SUCCESS
+ || state == TaskExecutionStatus.DELAY_EXECUTION) {
// try to take over task instance
if (state != TaskExecutionStatus.SUBMITTED_SUCCESS
+ && state != TaskExecutionStatus.DELAY_EXECUTION
&& tryToTakeOverTaskInstance(existTaskInstance)) {
log.info("Success take over task {}",
existTaskInstance.getName());
continue;
@@ -1357,6 +1360,8 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
// set the task instance state to fault tolerance
existTaskInstance.setFlag(Flag.NO);
existTaskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
+ releaseTaskGroup(existTaskInstance);
+
validTaskMap.remove(existTaskInstance.getTaskCode());
taskInstanceDao.updateById(existTaskInstance);
existTaskInstance =
cloneTolerantTaskInstance(existTaskInstance);
@@ -1444,12 +1449,12 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
ITaskInstanceOperator iTaskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
ITaskInstanceOperator.class);
- UpdateWorkflowHostResponse updateWorkflowHostResponse =
iTaskInstanceOperator.updateWorkflowInstanceHost(
+ UpdateWorkflowHostResponse response =
iTaskInstanceOperator.updateWorkflowInstanceHost(
new UpdateWorkflowHostRequest(taskInstance.getId(),
masterConfig.getMasterAddress()));
- if (!updateWorkflowHostResponse.isSuccess()) {
+ if (!response.isSuccess()) {
log.error(
- "Takeover TaskInstance failed, receive a failed
response from worker: {}, will try to create a new TaskInstance",
- taskInstance.getHost());
+ "Takeover TaskInstance failed, receive a failed
response: {} from worker: {}, will try to create a new TaskInstance",
+ response, taskInstance.getHost());
return false;
}