This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new acabe809d [Hotfix][Zeta] fix pipeline state not right bug (#4823)
acabe809d is described below

commit acabe809da3945fabac6fa14f966032eabb0b9b2
Author: Eric <[email protected]>
AuthorDate: Tue May 30 17:24:34 2023 +0800

    [Hotfix][Zeta] fix pipeline state not right bug (#4823)
---
 .../engine/server/dag/physical/PhysicalPlan.java   |  12 -
 .../engine/server/dag/physical/PhysicalVertex.java | 138 ++++++++---
 .../engine/server/dag/physical/SubPlan.java        | 268 +++++++++++++--------
 .../server/scheduler/PipelineBaseScheduler.java    |  15 +-
 4 files changed, 295 insertions(+), 138 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 4a81fdf63..ed3ec5c8c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -140,10 +140,6 @@ public class PhysicalPlan {
                 pipelineState -> {
                     try {
                         if 
(PipelineStatus.CANCELED.equals(pipelineState.getPipelineStatus())) {
-                            if (subPlan.canRestorePipeline()) {
-                                subPlan.restorePipeline();
-                                return;
-                            }
                             canceledPipelineNum.incrementAndGet();
                             if (makeJobEndWhenPipelineEnded) {
                                 LOGGER.info(
@@ -154,14 +150,6 @@ public class PhysicalPlan {
                             }
                         } else if (PipelineStatus.FAILED.equals(
                                 pipelineState.getPipelineStatus())) {
-                            if (subPlan.canRestorePipeline()) {
-                                LOGGER.info(
-                                        String.format(
-                                                "Can restore pipeline %s",
-                                                
subPlan.getPipelineFullName()));
-                                subPlan.restorePipeline();
-                                return;
-                            }
                             failedPipelineNum.incrementAndGet();
                             errorBySubPlan.compareAndSet(null, 
pipelineState.getThrowableMsg());
                             if (makeJobEndWhenPipelineEnded) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 586afbec3..ef019d610 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.dag.physical;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
@@ -40,6 +41,8 @@ import org.apache.commons.lang3.StringUtils;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.cluster.Member;
+import com.hazelcast.core.HazelcastInstanceNotActiveException;
+import com.hazelcast.core.OperationTimeoutException;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
@@ -100,6 +103,8 @@ public class PhysicalVertex {
 
     private JobMaster jobMaster;
 
+    private volatile ExecutionState currExecutionState = 
ExecutionState.CREATED;
+
     public PhysicalVertex(
             int subTaskGroupIndex,
             @NonNull ExecutorService executorService,
@@ -135,6 +140,8 @@ public class PhysicalVertex {
             runningJobStateIMap.put(taskGroupLocation, ExecutionState.CREATED);
         }
 
+        this.currExecutionState = (ExecutionState) 
runningJobStateIMap.get(taskGroupLocation);
+
         this.nodeEngine = nodeEngine;
         if (LOGGER.isFineEnabled() || LOGGER.isFinestEnabled()) {
             this.taskFullName =
@@ -169,18 +176,18 @@ public class PhysicalVertex {
 
     public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
         this.taskFuture = new CompletableFuture<>();
-        ExecutionState executionState = (ExecutionState) 
runningJobStateIMap.get(taskGroupLocation);
-        if (executionState != null) {
+        this.currExecutionState = (ExecutionState) 
runningJobStateIMap.get(taskGroupLocation);
+        if (currExecutionState != null) {
             LOGGER.info(
                     String.format(
                             "The task %s is in state %s when init state 
future",
-                            taskFullName, executionState));
+                            taskFullName, currExecutionState));
         }
         // if the task state is RUNNING
         // We need to check the real running status of Task from 
taskExecutionServer.
         // Because the state may be RUNNING when the cluster is restarted, but 
the Task no longer
         // exists.
-        if (ExecutionState.RUNNING.equals(executionState)) {
+        if (ExecutionState.RUNNING.equals(currExecutionState)) {
             if (!checkTaskGroupIsExecuting(taskGroupLocation)) {
                 updateTaskState(ExecutionState.RUNNING, ExecutionState.FAILED);
                 this.taskFuture.complete(
@@ -188,10 +195,10 @@ public class PhysicalVertex {
             }
         }
         // If the task state is CANCELING we need call 
noticeTaskExecutionServiceCancel().
-        else if (ExecutionState.CANCELING.equals(executionState)) {
+        else if (ExecutionState.CANCELING.equals(currExecutionState)) {
             noticeTaskExecutionServiceCancel();
-        } else if (executionState.isEndState()) {
-            this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, 
executionState));
+        } else if (currExecutionState.isEndState()) {
+            this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, 
currExecutionState));
         }
         return new PassiveCompletableFuture<>(this.taskFuture);
     }
@@ -248,7 +255,7 @@ public class PhysicalVertex {
         return null;
     }
 
-    private TaskDeployState deployOnLocal(@NonNull SlotProfile slotProfile) {
+    private TaskDeployState deployOnLocal(@NonNull SlotProfile slotProfile) 
throws Exception {
         return deployInternal(
                 taskGroupImmutableInformation -> {
                     SeaTunnelServer server = 
nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
@@ -259,7 +266,7 @@ public class PhysicalVertex {
                 });
     }
 
-    private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile) {
+    private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile) 
throws Exception {
         return deployInternal(
                 taskGroupImmutableInformation -> {
                     try {
@@ -300,9 +307,7 @@ public class PhysicalVertex {
         TaskGroupImmutableInformation taskGroupImmutableInformation =
                 getTaskGroupImmutableInformation();
         synchronized (this) {
-            ExecutionState currentState =
-                    (ExecutionState) 
runningJobStateIMap.get(taskGroupLocation);
-            if (ExecutionState.DEPLOYING.equals(currentState)) {
+            if (ExecutionState.DEPLOYING.equals(currExecutionState)) {
                 TaskDeployState state = 
taskGroupConsumer.apply(taskGroupImmutableInformation);
                 updateTaskState(ExecutionState.DEPLOYING, 
ExecutionState.RUNNING);
                 return state;
@@ -339,22 +344,45 @@ public class PhysicalVertex {
                 return false;
             }
             // consistency check
-            ExecutionState currentState =
-                    (ExecutionState) 
runningJobStateIMap.get(taskGroupLocation);
-            if (currentState.equals(endState)) {
+            if (currExecutionState.equals(endState)) {
                 return true;
             }
-            if (currentState.isEndState()) {
+            if (currExecutionState.isEndState()) {
                 String message =
                         String.format(
                                 "Task %s is already in terminal state %s",
-                                taskFullName, currentState);
+                                taskFullName, currExecutionState);
                 LOGGER.warning(message);
                 return false;
             }
 
-            updateStateTimestamps(endState);
-            runningJobStateIMap.set(taskGroupLocation, endState);
+            try {
+                RetryUtils.retryWithException(
+                        () -> {
+                            updateStateTimestamps(endState);
+                            runningJobStateIMap.set(taskGroupLocation, 
endState);
+                            return null;
+                        },
+                        new RetryUtils.RetryMaterial(
+                                Constant.OPERATION_RETRY_TIME,
+                                true,
+                                exception ->
+                                        exception instanceof 
OperationTimeoutException
+                                                || exception
+                                                        instanceof
+                                                        
HazelcastInstanceNotActiveException
+                                                || exception instanceof 
InterruptedException,
+                                Constant.OPERATION_RETRY_SLEEP));
+            } catch (Exception e) {
+                LOGGER.warning(ExceptionUtils.getMessage(e));
+                // If master/worker node done, The job will restore and fix 
the state from
+                // TaskExecutionService
+                LOGGER.warning(
+                        String.format(
+                                "Set %s state %s to Imap failed, skip.",
+                                getTaskFullName(), endState));
+            }
+            this.currExecutionState = endState;
             LOGGER.info(String.format("%s turn to end state %s.", 
taskFullName, endState));
             return true;
         }
@@ -362,11 +390,11 @@ public class PhysicalVertex {
 
     public boolean updateTaskState(
             @NonNull ExecutionState current, @NonNull ExecutionState 
targetState) {
-        LOGGER.fine(
-                String.format(
-                        "Try to update the task %s state from %s to %s",
-                        taskFullName, current, targetState));
         synchronized (this) {
+            LOGGER.fine(
+                    String.format(
+                            "Try to update the task %s state from %s to %s",
+                            taskFullName, current, targetState));
             // consistency check
             if (current.isEndState()) {
                 String message = "Task is trying to leave terminal state " + 
current;
@@ -396,9 +424,34 @@ public class PhysicalVertex {
             }
 
             // now do the actual state transition
-            if (current.equals(runningJobStateIMap.get(taskGroupLocation))) {
-                updateStateTimestamps(targetState);
-                runningJobStateIMap.set(taskGroupLocation, targetState);
+            if (current.equals(currExecutionState)) {
+                try {
+                    RetryUtils.retryWithException(
+                            () -> {
+                                updateStateTimestamps(targetState);
+                                runningJobStateIMap.set(taskGroupLocation, 
targetState);
+                                return null;
+                            },
+                            new RetryUtils.RetryMaterial(
+                                    Constant.OPERATION_RETRY_TIME,
+                                    true,
+                                    exception ->
+                                            exception instanceof 
OperationTimeoutException
+                                                    || exception
+                                                            instanceof
+                                                            
HazelcastInstanceNotActiveException
+                                                    || exception instanceof 
InterruptedException,
+                                    Constant.OPERATION_RETRY_SLEEP));
+                } catch (Exception e) {
+                    LOGGER.warning(ExceptionUtils.getMessage(e));
+                    // If master/worker node done, The job will restore and 
fix the state from
+                    // TaskExecutionService
+                    LOGGER.warning(
+                            String.format(
+                                    "Set %s state %s to Imap failed, skip.",
+                                    getTaskFullName(), targetState));
+                }
+                this.currExecutionState = targetState;
                 LOGGER.info(
                         String.format(
                                 "%s turn from state %s to %s.",
@@ -408,7 +461,7 @@ public class PhysicalVertex {
                 LOGGER.warning(
                         String.format(
                                 "The task %s state in Imap is %s, not equals 
expected state %s",
-                                taskFullName, 
runningJobStateIMap.get(taskGroupLocation), current));
+                                taskFullName, currExecutionState, current));
                 return false;
             }
         }
@@ -490,7 +543,7 @@ public class PhysicalVertex {
     }
 
     public ExecutionState getExecutionState() {
-        return (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
+        return currExecutionState;
     }
 
     private void resetExecutionState() {
@@ -504,8 +557,33 @@ public class PhysicalVertex {
                 LOGGER.severe(message);
                 throw new IllegalStateException(message);
             }
-            updateStateTimestamps(ExecutionState.CREATED);
-            runningJobStateIMap.set(taskGroupLocation, ExecutionState.CREATED);
+            try {
+                RetryUtils.retryWithException(
+                        () -> {
+                            updateStateTimestamps(ExecutionState.CREATED);
+                            runningJobStateIMap.set(taskGroupLocation, 
ExecutionState.CREATED);
+                            return null;
+                        },
+                        new RetryUtils.RetryMaterial(
+                                Constant.OPERATION_RETRY_TIME,
+                                true,
+                                exception ->
+                                        exception instanceof 
OperationTimeoutException
+                                                || exception
+                                                        instanceof
+                                                        
HazelcastInstanceNotActiveException
+                                                || exception instanceof 
InterruptedException,
+                                Constant.OPERATION_RETRY_SLEEP));
+            } catch (Exception e) {
+                LOGGER.warning(ExceptionUtils.getMessage(e));
+                // If master/worker node done, The job will restore and fix 
the state from
+                // TaskExecutionService
+                LOGGER.warning(
+                        String.format(
+                                "Set %s state %s to Imap failed, skip.",
+                                getTaskFullName(), ExecutionState.CREATED));
+            }
+            this.currExecutionState = ExecutionState.CREATED;
             LOGGER.info(
                     String.format("%s turn to state %s.", taskFullName, 
ExecutionState.CREATED));
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 1540db61f..6a1938766 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -18,6 +18,8 @@
 package org.apache.seatunnel.engine.server.dag.physical;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
@@ -97,6 +99,8 @@ public class SubPlan {
 
     private final Object restoreLock = new Object();
 
+    private volatile PipelineStatus currPipelineStatus = 
PipelineStatus.INITIALIZING;
+
     public SubPlan(
             int pipelineId,
             int totalPipelineNum,
@@ -130,6 +134,8 @@ public class SubPlan {
             runningJobStateIMap.put(pipelineLocation, PipelineStatus.CREATED);
         }
 
+        this.currPipelineStatus = (PipelineStatus) 
runningJobStateIMap.get(pipelineLocation);
+
         this.pipelineFullName =
                 String.format(
                         "Job %s (%s), Pipeline: [(%d/%d)]",
@@ -181,67 +187,32 @@ public class SubPlan {
 
                         if (finishedTaskNum.incrementAndGet()
                                 == (physicalVertexList.size() + 
coordinatorVertexList.size())) {
-                            PipelineStatus pipelineStatus = null;
-                            if (failedTaskNum.get() > 0) {
-                                pipelineStatus = PipelineStatus.FAILED;
-                                // we don't care the checkpoint error reason 
when the task is
-                                // failed.
-                                jobMaster
-                                        .getCheckpointManager()
-                                        .cancelCheckpoint(getPipelineId())
-                                        .join();
-                            } else if (canceledTaskNum.get() > 0) {
-                                pipelineStatus = PipelineStatus.CANCELED;
-                                CheckpointCoordinatorState 
checkpointCoordinatorState =
-                                        jobMaster
-                                                .getCheckpointManager()
-                                                
.cancelCheckpoint(getPipelineId())
-                                                .join();
-                                if (CheckpointCoordinatorStatus.FAILED.equals(
-                                        checkpointCoordinatorState
-                                                
.getCheckpointCoordinatorStatus())) {
-                                    pipelineStatus = PipelineStatus.FAILED;
-                                    errorByPhysicalVertex.compareAndSet(
-                                            null, 
checkpointCoordinatorState.getThrowableMsg());
-                                }
-                            } else {
-                                pipelineStatus = PipelineStatus.FINISHED;
-                                CheckpointCoordinatorState 
checkpointCoordinatorState =
-                                        jobMaster
-                                                .getCheckpointManager()
-                                                
.waitCheckpointCoordinatorComplete(getPipelineId())
-                                                .join();
-
-                                if (CheckpointCoordinatorStatus.FAILED.equals(
-                                        checkpointCoordinatorState
-                                                
.getCheckpointCoordinatorStatus())) {
-                                    pipelineStatus = PipelineStatus.FAILED;
-                                    errorByPhysicalVertex.compareAndSet(
-                                            null, 
checkpointCoordinatorState.getThrowableMsg());
-                                } else if 
(CheckpointCoordinatorStatus.CANCELED.equals(
-                                        checkpointCoordinatorState
-                                                
.getCheckpointCoordinatorStatus())) {
-                                    pipelineStatus = PipelineStatus.CANCELED;
-                                    errorByPhysicalVertex.compareAndSet(
-                                            null, 
checkpointCoordinatorState.getThrowableMsg());
-                                }
-                            }
-
-                            if (!checkNeedRestore(pipelineStatus)) {
-                                subPlanDone(pipelineStatus);
-                            }
-
-                            turnToEndState(pipelineStatus);
+                            PipelineStatus pipelineEndState = 
getPipelineEndState();
                             LOGGER.info(
                                     String.format(
                                             "%s end with state %s",
-                                            this.pipelineFullName, 
pipelineStatus));
-
-                            pipelineFuture.complete(
-                                    new PipelineExecutionState(
-                                            pipelineId,
-                                            pipelineStatus,
-                                            errorByPhysicalVertex.get()));
+                                            this.pipelineFullName, 
pipelineEndState));
+
+                            if (!checkNeedRestore(pipelineEndState)) {
+                                subPlanDone(pipelineEndState);
+                                turnToEndState(pipelineEndState);
+                                pipelineFuture.complete(
+                                        new PipelineExecutionState(
+                                                pipelineId,
+                                                pipelineEndState,
+                                                errorByPhysicalVertex.get()));
+                            } else {
+                                turnToEndState(pipelineEndState);
+                                if (prepareRestorePipeline()) {
+                                    restorePipeline();
+                                } else {
+                                    pipelineFuture.complete(
+                                            new PipelineExecutionState(
+                                                    pipelineId,
+                                                    pipelineEndState,
+                                                    
errorByPhysicalVertex.get()));
+                                }
+                            }
                         }
                     } catch (Throwable e) {
                         LOGGER.severe(
@@ -255,6 +226,46 @@ public class SubPlan {
                 executorService);
     }
 
+    private PipelineStatus getPipelineEndState() {
+        PipelineStatus pipelineStatus = null;
+        if (failedTaskNum.get() > 0) {
+            pipelineStatus = PipelineStatus.FAILED;
+            // we don't care the checkpoint error reason when the task is
+            // failed.
+            
jobMaster.getCheckpointManager().cancelCheckpoint(getPipelineId()).join();
+        } else if (canceledTaskNum.get() > 0) {
+            pipelineStatus = PipelineStatus.CANCELED;
+            CheckpointCoordinatorState checkpointCoordinatorState =
+                    
jobMaster.getCheckpointManager().cancelCheckpoint(getPipelineId()).join();
+            if (CheckpointCoordinatorStatus.FAILED.equals(
+                    
checkpointCoordinatorState.getCheckpointCoordinatorStatus())) {
+                pipelineStatus = PipelineStatus.FAILED;
+                errorByPhysicalVertex.compareAndSet(
+                        null, checkpointCoordinatorState.getThrowableMsg());
+            }
+        } else {
+            pipelineStatus = PipelineStatus.FINISHED;
+            CheckpointCoordinatorState checkpointCoordinatorState =
+                    jobMaster
+                            .getCheckpointManager()
+                            .waitCheckpointCoordinatorComplete(getPipelineId())
+                            .join();
+
+            if (CheckpointCoordinatorStatus.FAILED.equals(
+                    
checkpointCoordinatorState.getCheckpointCoordinatorStatus())) {
+                pipelineStatus = PipelineStatus.FAILED;
+                errorByPhysicalVertex.compareAndSet(
+                        null, checkpointCoordinatorState.getThrowableMsg());
+            } else if (CheckpointCoordinatorStatus.CANCELED.equals(
+                    
checkpointCoordinatorState.getCheckpointCoordinatorStatus())) {
+                pipelineStatus = PipelineStatus.CANCELED;
+                errorByPhysicalVertex.compareAndSet(
+                        null, checkpointCoordinatorState.getThrowableMsg());
+            }
+        }
+        return pipelineStatus;
+    }
+
     private boolean checkNeedRestore(PipelineStatus pipelineStatus) {
         return canRestorePipeline() && 
!PipelineStatus.FINISHED.equals(pipelineStatus);
     }
@@ -270,23 +281,35 @@ public class SubPlan {
                 .join();
     }
 
-    private void subPlanDone(PipelineStatus pipelineStatus) {
-        jobMaster.savePipelineMetricsToHistory(getPipelineLocation());
-        jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus);
-        jobMaster.releasePipelineResource(this);
-        notifyCheckpointManagerPipelineEnd(pipelineStatus);
+    private void subPlanDone(PipelineStatus pipelineStatus) throws Exception {
+        RetryUtils.retryWithException(
+                () -> {
+                    
jobMaster.savePipelineMetricsToHistory(getPipelineLocation());
+                    jobMaster.removeMetricsContext(getPipelineLocation(), 
pipelineStatus);
+                    jobMaster.releasePipelineResource(this);
+                    notifyCheckpointManagerPipelineEnd(pipelineStatus);
+                    return null;
+                },
+                new RetryUtils.RetryMaterial(
+                        Constant.OPERATION_RETRY_TIME,
+                        true,
+                        exception ->
+                                exception instanceof OperationTimeoutException
+                                        || exception instanceof 
HazelcastInstanceNotActiveException
+                                        || exception instanceof 
InterruptedException,
+                        Constant.OPERATION_RETRY_SLEEP));
     }
 
     public boolean canRestorePipeline() {
         return jobMaster.isNeedRestore() && getPipelineRestoreNum() < 
PIPELINE_MAX_RESTORE_NUM;
     }
 
-    private void turnToEndState(@NonNull PipelineStatus endState) {
+    private void turnToEndState(@NonNull PipelineStatus endState) throws 
Exception {
         synchronized (this) {
             // consistency check
-            PipelineStatus current = (PipelineStatus) 
runningJobStateIMap.get(pipelineLocation);
-            if (current.isEndState() && !endState.isEndState()) {
-                String message = "Pipeline is trying to leave terminal state " 
+ current;
+            if (this.currPipelineStatus.isEndState() && 
!endState.isEndState()) {
+                String message =
+                        "Pipeline is trying to leave terminal state " + 
this.currPipelineStatus;
                 LOGGER.severe(message);
                 throw new IllegalStateException(message);
             }
@@ -299,14 +322,27 @@ public class SubPlan {
 
             // we must update runningJobStateTimestampsIMap first and then can 
update
             // runningJobStateIMap
-            updateStateTimestamps(endState);
-
-            runningJobStateIMap.set(pipelineLocation, endState);
+            RetryUtils.retryWithException(
+                    () -> {
+                        updateStateTimestamps(endState);
+                        runningJobStateIMap.set(pipelineLocation, endState);
+                        return null;
+                    },
+                    new RetryUtils.RetryMaterial(
+                            Constant.OPERATION_RETRY_TIME,
+                            true,
+                            exception ->
+                                    exception instanceof 
OperationTimeoutException
+                                            || exception
+                                                    instanceof 
HazelcastInstanceNotActiveException
+                                            || exception instanceof 
InterruptedException,
+                            Constant.OPERATION_RETRY_SLEEP));
+            this.currPipelineStatus = endState;
         }
     }
 
     public boolean updatePipelineState(
-            @NonNull PipelineStatus current, @NonNull PipelineStatus 
targetState) {
+            @NonNull PipelineStatus current, @NonNull PipelineStatus 
targetState) throws Exception {
         synchronized (this) {
             // consistency check
             if (current.isEndState()) {
@@ -345,8 +381,23 @@ public class SubPlan {
 
                 // we must update runningJobStateTimestampsIMap first and then 
can update
                 // runningJobStateIMap
-                updateStateTimestamps(targetState);
-                runningJobStateIMap.set(pipelineLocation, targetState);
+                RetryUtils.retryWithException(
+                        () -> {
+                            updateStateTimestamps(targetState);
+                            runningJobStateIMap.set(pipelineLocation, 
targetState);
+                            return null;
+                        },
+                        new RetryUtils.RetryMaterial(
+                                Constant.OPERATION_RETRY_TIME,
+                                true,
+                                exception ->
+                                        exception instanceof 
OperationTimeoutException
+                                                || exception
+                                                        instanceof
+                                                        
HazelcastInstanceNotActiveException
+                                                || exception instanceof 
InterruptedException,
+                                Constant.OPERATION_RETRY_SLEEP));
+                this.currPipelineStatus = targetState;
                 return true;
             } else {
                 return false;
@@ -436,15 +487,11 @@ public class SubPlan {
                     },
                     executorService);
         }
-        LOGGER.info(
-                String.format(
-                        "can not cancel task %s because it is in state %s ",
-                        task.getTaskFullName(), task.getExecutionState()));
         return null;
     }
 
     /** Before restore a pipeline, the pipeline must do reset */
-    private synchronized void reset() {
+    private synchronized void reset() throws Exception {
         resetPipelineState();
         finishedTaskNum.set(0);
         canceledTaskNum.set(0);
@@ -463,23 +510,41 @@ public class SubPlan {
         runningJobStateTimestampsIMap.set(pipelineLocation, stateTimestamps);
     }
 
-    private void resetPipelineState() {
-        PipelineStatus pipelineState = getPipelineState();
-        if (!pipelineState.isEndState()) {
-            String message =
-                    String.format(
-                            "%s reset state failed, only end state can be 
reset, current is %s",
-                            getPipelineFullName(), pipelineState);
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
-        }
+    private void resetPipelineState() throws Exception {
+        RetryUtils.retryWithException(
+                () -> {
+                    PipelineStatus pipelineState = getPipelineState();
+                    if (!pipelineState.isEndState()) {
+                        String message =
+                                String.format(
+                                        "%s reset state failed, only end state 
can be reset, current is %s",
+                                        getPipelineFullName(), pipelineState);
+                        LOGGER.severe(message);
+                        throw new IllegalStateException(message);
+                    }
 
-        updateStateTimestamps(PipelineStatus.CREATED);
-        runningJobStateIMap.set(pipelineLocation, PipelineStatus.CREATED);
+                    updateStateTimestamps(PipelineStatus.CREATED);
+                    runningJobStateIMap.set(pipelineLocation, 
PipelineStatus.CREATED);
+                    this.currPipelineStatus = PipelineStatus.CREATED;
+                    ;
+                    return null;
+                },
+                new RetryUtils.RetryMaterial(
+                        Constant.OPERATION_RETRY_TIME,
+                        true,
+                        exception ->
+                                exception instanceof OperationTimeoutException
+                                        || exception instanceof 
HazelcastInstanceNotActiveException
+                                        || exception instanceof 
InterruptedException,
+                        Constant.OPERATION_RETRY_SLEEP));
     }
 
-    /** restore the pipeline when pipeline failed or canceled by error. */
-    public void restorePipeline() {
+    /**
+     * reset the pipeline and task state and init state future again
+     *
+     * @return
+     */
+    private boolean prepareRestorePipeline() {
         synchronized (restoreLock) {
             try {
                 pipelineRestoreNum++;
@@ -497,9 +562,24 @@ public class SubPlan {
                 }
                 reset();
                 jobMaster.getPhysicalPlan().addPipelineEndCallback(this);
+                return true;
+            } catch (Throwable e) {
+                if (this.currPipelineStatus.isEndState()) {
+                    // restore failed
+                    return false;
+                }
+                jobMaster.getPhysicalPlan().addPipelineEndCallback(this);
+                return true;
+            }
+        }
+    }
+
+    /** restore the pipeline when pipeline failed or canceled by error. */
+    public void restorePipeline() {
+        synchronized (restoreLock) {
+            try {
                 if 
(jobMaster.getCheckpointManager().isCompletedPipeline(pipelineId)) {
                     forcePipelineFinish();
-                    return;
                 }
                 
jobMaster.getCheckpointManager().reportedPipelineRunning(pipelineId, false);
                 reSchedulerPipelineFuture = 
jobMaster.reSchedulerPipeline(this);
@@ -560,7 +640,7 @@ public class SubPlan {
     }
 
     public PipelineStatus getPipelineState() {
-        return (PipelineStatus) runningJobStateIMap.get(pipelineLocation);
+        return this.currPipelineStatus;
     }
 
     public PipelineLocation getPipelineLocation() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index fc6aff87f..ab5ad8f82 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -291,8 +291,19 @@ public class PipelineBaseScheduler implements JobScheduler 
{
 
     private void deployPipeline(
             @NonNull SubPlan pipeline, Map<TaskGroupLocation, SlotProfile> 
slotProfiles) {
-        if (pipeline.updatePipelineState(PipelineStatus.SCHEDULED, 
PipelineStatus.DEPLOYING)) {
-
+        boolean changeStateSuccess = false;
+        try {
+            changeStateSuccess =
+                    pipeline.updatePipelineState(
+                            PipelineStatus.SCHEDULED, 
PipelineStatus.DEPLOYING);
+        } catch (Exception e) {
+            log.warn(
+                    "{} turn to state {} failed, cancel pipeline",
+                    pipeline.getPipelineFullName(),
+                    PipelineStatus.DEPLOYING);
+            pipeline.cancelPipeline();
+        }
+        if (changeStateSuccess) {
             try {
                 List<CompletableFuture<?>> deployCoordinatorFuture =
                         pipeline.getCoordinatorVertexList().stream()


Reply via email to