This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new acabe809d [Hotfix][Zeta] fix pipeline state not right bug (#4823)
acabe809d is described below
commit acabe809da3945fabac6fa14f966032eabb0b9b2
Author: Eric <[email protected]>
AuthorDate: Tue May 30 17:24:34 2023 +0800
[Hotfix][Zeta] fix pipeline state not right bug (#4823)
---
.../engine/server/dag/physical/PhysicalPlan.java | 12 -
.../engine/server/dag/physical/PhysicalVertex.java | 138 ++++++++---
.../engine/server/dag/physical/SubPlan.java | 268 +++++++++++++--------
.../server/scheduler/PipelineBaseScheduler.java | 15 +-
4 files changed, 295 insertions(+), 138 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 4a81fdf63..ed3ec5c8c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -140,10 +140,6 @@ public class PhysicalPlan {
pipelineState -> {
try {
if
(PipelineStatus.CANCELED.equals(pipelineState.getPipelineStatus())) {
- if (subPlan.canRestorePipeline()) {
- subPlan.restorePipeline();
- return;
- }
canceledPipelineNum.incrementAndGet();
if (makeJobEndWhenPipelineEnded) {
LOGGER.info(
@@ -154,14 +150,6 @@ public class PhysicalPlan {
}
} else if (PipelineStatus.FAILED.equals(
pipelineState.getPipelineStatus())) {
- if (subPlan.canRestorePipeline()) {
- LOGGER.info(
- String.format(
- "Can restore pipeline %s",
-
subPlan.getPipelineFullName()));
- subPlan.restorePipeline();
- return;
- }
failedPipelineNum.incrementAndGet();
errorBySubPlan.compareAndSet(null,
pipelineState.getThrowableMsg());
if (makeJobEndWhenPipelineEnded) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 586afbec3..ef019d610 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
@@ -40,6 +41,8 @@ import org.apache.commons.lang3.StringUtils;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
+import com.hazelcast.core.HazelcastInstanceNotActiveException;
+import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
@@ -100,6 +103,8 @@ public class PhysicalVertex {
private JobMaster jobMaster;
+ private volatile ExecutionState currExecutionState =
ExecutionState.CREATED;
+
public PhysicalVertex(
int subTaskGroupIndex,
@NonNull ExecutorService executorService,
@@ -135,6 +140,8 @@ public class PhysicalVertex {
runningJobStateIMap.put(taskGroupLocation, ExecutionState.CREATED);
}
+ this.currExecutionState = (ExecutionState)
runningJobStateIMap.get(taskGroupLocation);
+
this.nodeEngine = nodeEngine;
if (LOGGER.isFineEnabled() || LOGGER.isFinestEnabled()) {
this.taskFullName =
@@ -169,18 +176,18 @@ public class PhysicalVertex {
public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
this.taskFuture = new CompletableFuture<>();
- ExecutionState executionState = (ExecutionState)
runningJobStateIMap.get(taskGroupLocation);
- if (executionState != null) {
+ this.currExecutionState = (ExecutionState)
runningJobStateIMap.get(taskGroupLocation);
+ if (currExecutionState != null) {
LOGGER.info(
String.format(
"The task %s is in state %s when init state
future",
- taskFullName, executionState));
+ taskFullName, currExecutionState));
}
// if the task state is RUNNING
// We need to check the real running status of Task from
taskExecutionServer.
// Because the state may be RUNNING when the cluster is restarted, but
the Task no longer
// exists.
- if (ExecutionState.RUNNING.equals(executionState)) {
+ if (ExecutionState.RUNNING.equals(currExecutionState)) {
if (!checkTaskGroupIsExecuting(taskGroupLocation)) {
updateTaskState(ExecutionState.RUNNING, ExecutionState.FAILED);
this.taskFuture.complete(
@@ -188,10 +195,10 @@ public class PhysicalVertex {
}
}
// If the task state is CANCELING we need call
noticeTaskExecutionServiceCancel().
- else if (ExecutionState.CANCELING.equals(executionState)) {
+ else if (ExecutionState.CANCELING.equals(currExecutionState)) {
noticeTaskExecutionServiceCancel();
- } else if (executionState.isEndState()) {
- this.taskFuture.complete(new TaskExecutionState(taskGroupLocation,
executionState));
+ } else if (currExecutionState.isEndState()) {
+ this.taskFuture.complete(new TaskExecutionState(taskGroupLocation,
currExecutionState));
}
return new PassiveCompletableFuture<>(this.taskFuture);
}
@@ -248,7 +255,7 @@ public class PhysicalVertex {
return null;
}
- private TaskDeployState deployOnLocal(@NonNull SlotProfile slotProfile) {
+ private TaskDeployState deployOnLocal(@NonNull SlotProfile slotProfile)
throws Exception {
return deployInternal(
taskGroupImmutableInformation -> {
SeaTunnelServer server =
nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
@@ -259,7 +266,7 @@ public class PhysicalVertex {
});
}
- private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile) {
+ private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile)
throws Exception {
return deployInternal(
taskGroupImmutableInformation -> {
try {
@@ -300,9 +307,7 @@ public class PhysicalVertex {
TaskGroupImmutableInformation taskGroupImmutableInformation =
getTaskGroupImmutableInformation();
synchronized (this) {
- ExecutionState currentState =
- (ExecutionState)
runningJobStateIMap.get(taskGroupLocation);
- if (ExecutionState.DEPLOYING.equals(currentState)) {
+ if (ExecutionState.DEPLOYING.equals(currExecutionState)) {
TaskDeployState state =
taskGroupConsumer.apply(taskGroupImmutableInformation);
updateTaskState(ExecutionState.DEPLOYING,
ExecutionState.RUNNING);
return state;
@@ -339,22 +344,45 @@ public class PhysicalVertex {
return false;
}
// consistency check
- ExecutionState currentState =
- (ExecutionState)
runningJobStateIMap.get(taskGroupLocation);
- if (currentState.equals(endState)) {
+ if (currExecutionState.equals(endState)) {
return true;
}
- if (currentState.isEndState()) {
+ if (currExecutionState.isEndState()) {
String message =
String.format(
"Task %s is already in terminal state %s",
- taskFullName, currentState);
+ taskFullName, currExecutionState);
LOGGER.warning(message);
return false;
}
- updateStateTimestamps(endState);
- runningJobStateIMap.set(taskGroupLocation, endState);
+ try {
+ RetryUtils.retryWithException(
+ () -> {
+ updateStateTimestamps(endState);
+ runningJobStateIMap.set(taskGroupLocation,
endState);
+ return null;
+ },
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ true,
+ exception ->
+ exception instanceof
OperationTimeoutException
+ || exception
+ instanceof
+
HazelcastInstanceNotActiveException
+ || exception instanceof
InterruptedException,
+ Constant.OPERATION_RETRY_SLEEP));
+ } catch (Exception e) {
+ LOGGER.warning(ExceptionUtils.getMessage(e));
+ // If master/worker node done, The job will restore and fix
the state from
+ // TaskExecutionService
+ LOGGER.warning(
+ String.format(
+ "Set %s state %s to Imap failed, skip.",
+ getTaskFullName(), endState));
+ }
+ this.currExecutionState = endState;
LOGGER.info(String.format("%s turn to end state %s.",
taskFullName, endState));
return true;
}
@@ -362,11 +390,11 @@ public class PhysicalVertex {
public boolean updateTaskState(
@NonNull ExecutionState current, @NonNull ExecutionState
targetState) {
- LOGGER.fine(
- String.format(
- "Try to update the task %s state from %s to %s",
- taskFullName, current, targetState));
synchronized (this) {
+ LOGGER.fine(
+ String.format(
+ "Try to update the task %s state from %s to %s",
+ taskFullName, current, targetState));
// consistency check
if (current.isEndState()) {
String message = "Task is trying to leave terminal state " +
current;
@@ -396,9 +424,34 @@ public class PhysicalVertex {
}
// now do the actual state transition
- if (current.equals(runningJobStateIMap.get(taskGroupLocation))) {
- updateStateTimestamps(targetState);
- runningJobStateIMap.set(taskGroupLocation, targetState);
+ if (current.equals(currExecutionState)) {
+ try {
+ RetryUtils.retryWithException(
+ () -> {
+ updateStateTimestamps(targetState);
+ runningJobStateIMap.set(taskGroupLocation,
targetState);
+ return null;
+ },
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ true,
+ exception ->
+ exception instanceof
OperationTimeoutException
+ || exception
+ instanceof
+
HazelcastInstanceNotActiveException
+ || exception instanceof
InterruptedException,
+ Constant.OPERATION_RETRY_SLEEP));
+ } catch (Exception e) {
+ LOGGER.warning(ExceptionUtils.getMessage(e));
+ // If master/worker node done, The job will restore and
fix the state from
+ // TaskExecutionService
+ LOGGER.warning(
+ String.format(
+ "Set %s state %s to Imap failed, skip.",
+ getTaskFullName(), targetState));
+ }
+ this.currExecutionState = targetState;
LOGGER.info(
String.format(
"%s turn from state %s to %s.",
@@ -408,7 +461,7 @@ public class PhysicalVertex {
LOGGER.warning(
String.format(
"The task %s state in Imap is %s, not equals
expected state %s",
- taskFullName,
runningJobStateIMap.get(taskGroupLocation), current));
+ taskFullName, currExecutionState, current));
return false;
}
}
@@ -490,7 +543,7 @@ public class PhysicalVertex {
}
public ExecutionState getExecutionState() {
- return (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
+ return currExecutionState;
}
private void resetExecutionState() {
@@ -504,8 +557,33 @@ public class PhysicalVertex {
LOGGER.severe(message);
throw new IllegalStateException(message);
}
- updateStateTimestamps(ExecutionState.CREATED);
- runningJobStateIMap.set(taskGroupLocation, ExecutionState.CREATED);
+ try {
+ RetryUtils.retryWithException(
+ () -> {
+ updateStateTimestamps(ExecutionState.CREATED);
+ runningJobStateIMap.set(taskGroupLocation,
ExecutionState.CREATED);
+ return null;
+ },
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ true,
+ exception ->
+ exception instanceof
OperationTimeoutException
+ || exception
+ instanceof
+
HazelcastInstanceNotActiveException
+ || exception instanceof
InterruptedException,
+ Constant.OPERATION_RETRY_SLEEP));
+ } catch (Exception e) {
+ LOGGER.warning(ExceptionUtils.getMessage(e));
+ // If master/worker node done, The job will restore and fix
the state from
+ // TaskExecutionService
+ LOGGER.warning(
+ String.format(
+ "Set %s state %s to Imap failed, skip.",
+ getTaskFullName(), ExecutionState.CREATED));
+ }
+ this.currExecutionState = ExecutionState.CREATED;
LOGGER.info(
String.format("%s turn to state %s.", taskFullName,
ExecutionState.CREATED));
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 1540db61f..6a1938766 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
@@ -97,6 +99,8 @@ public class SubPlan {
private final Object restoreLock = new Object();
+ private volatile PipelineStatus currPipelineStatus =
PipelineStatus.INITIALIZING;
+
public SubPlan(
int pipelineId,
int totalPipelineNum,
@@ -130,6 +134,8 @@ public class SubPlan {
runningJobStateIMap.put(pipelineLocation, PipelineStatus.CREATED);
}
+ this.currPipelineStatus = (PipelineStatus)
runningJobStateIMap.get(pipelineLocation);
+
this.pipelineFullName =
String.format(
"Job %s (%s), Pipeline: [(%d/%d)]",
@@ -181,67 +187,32 @@ public class SubPlan {
if (finishedTaskNum.incrementAndGet()
== (physicalVertexList.size() +
coordinatorVertexList.size())) {
- PipelineStatus pipelineStatus = null;
- if (failedTaskNum.get() > 0) {
- pipelineStatus = PipelineStatus.FAILED;
- // we don't care the checkpoint error reason
when the task is
- // failed.
- jobMaster
- .getCheckpointManager()
- .cancelCheckpoint(getPipelineId())
- .join();
- } else if (canceledTaskNum.get() > 0) {
- pipelineStatus = PipelineStatus.CANCELED;
- CheckpointCoordinatorState
checkpointCoordinatorState =
- jobMaster
- .getCheckpointManager()
-
.cancelCheckpoint(getPipelineId())
- .join();
- if (CheckpointCoordinatorStatus.FAILED.equals(
- checkpointCoordinatorState
-
.getCheckpointCoordinatorStatus())) {
- pipelineStatus = PipelineStatus.FAILED;
- errorByPhysicalVertex.compareAndSet(
- null,
checkpointCoordinatorState.getThrowableMsg());
- }
- } else {
- pipelineStatus = PipelineStatus.FINISHED;
- CheckpointCoordinatorState
checkpointCoordinatorState =
- jobMaster
- .getCheckpointManager()
-
.waitCheckpointCoordinatorComplete(getPipelineId())
- .join();
-
- if (CheckpointCoordinatorStatus.FAILED.equals(
- checkpointCoordinatorState
-
.getCheckpointCoordinatorStatus())) {
- pipelineStatus = PipelineStatus.FAILED;
- errorByPhysicalVertex.compareAndSet(
- null,
checkpointCoordinatorState.getThrowableMsg());
- } else if
(CheckpointCoordinatorStatus.CANCELED.equals(
- checkpointCoordinatorState
-
.getCheckpointCoordinatorStatus())) {
- pipelineStatus = PipelineStatus.CANCELED;
- errorByPhysicalVertex.compareAndSet(
- null,
checkpointCoordinatorState.getThrowableMsg());
- }
- }
-
- if (!checkNeedRestore(pipelineStatus)) {
- subPlanDone(pipelineStatus);
- }
-
- turnToEndState(pipelineStatus);
+ PipelineStatus pipelineEndState =
getPipelineEndState();
LOGGER.info(
String.format(
"%s end with state %s",
- this.pipelineFullName,
pipelineStatus));
-
- pipelineFuture.complete(
- new PipelineExecutionState(
- pipelineId,
- pipelineStatus,
- errorByPhysicalVertex.get()));
+ this.pipelineFullName,
pipelineEndState));
+
+ if (!checkNeedRestore(pipelineEndState)) {
+ subPlanDone(pipelineEndState);
+ turnToEndState(pipelineEndState);
+ pipelineFuture.complete(
+ new PipelineExecutionState(
+ pipelineId,
+ pipelineEndState,
+ errorByPhysicalVertex.get()));
+ } else {
+ turnToEndState(pipelineEndState);
+ if (prepareRestorePipeline()) {
+ restorePipeline();
+ } else {
+ pipelineFuture.complete(
+ new PipelineExecutionState(
+ pipelineId,
+ pipelineEndState,
+
errorByPhysicalVertex.get()));
+ }
+ }
}
} catch (Throwable e) {
LOGGER.severe(
@@ -255,6 +226,46 @@ public class SubPlan {
executorService);
}
+ private PipelineStatus getPipelineEndState() {
+ PipelineStatus pipelineStatus = null;
+ if (failedTaskNum.get() > 0) {
+ pipelineStatus = PipelineStatus.FAILED;
+ // we don't care the checkpoint error reason when the task is
+ // failed.
+
jobMaster.getCheckpointManager().cancelCheckpoint(getPipelineId()).join();
+ } else if (canceledTaskNum.get() > 0) {
+ pipelineStatus = PipelineStatus.CANCELED;
+ CheckpointCoordinatorState checkpointCoordinatorState =
+
jobMaster.getCheckpointManager().cancelCheckpoint(getPipelineId()).join();
+ if (CheckpointCoordinatorStatus.FAILED.equals(
+
checkpointCoordinatorState.getCheckpointCoordinatorStatus())) {
+ pipelineStatus = PipelineStatus.FAILED;
+ errorByPhysicalVertex.compareAndSet(
+ null, checkpointCoordinatorState.getThrowableMsg());
+ }
+ } else {
+ pipelineStatus = PipelineStatus.FINISHED;
+ CheckpointCoordinatorState checkpointCoordinatorState =
+ jobMaster
+ .getCheckpointManager()
+ .waitCheckpointCoordinatorComplete(getPipelineId())
+ .join();
+
+ if (CheckpointCoordinatorStatus.FAILED.equals(
+
checkpointCoordinatorState.getCheckpointCoordinatorStatus())) {
+ pipelineStatus = PipelineStatus.FAILED;
+ errorByPhysicalVertex.compareAndSet(
+ null, checkpointCoordinatorState.getThrowableMsg());
+ } else if (CheckpointCoordinatorStatus.CANCELED.equals(
+
checkpointCoordinatorState.getCheckpointCoordinatorStatus())) {
+ pipelineStatus = PipelineStatus.CANCELED;
+ errorByPhysicalVertex.compareAndSet(
+ null, checkpointCoordinatorState.getThrowableMsg());
+ }
+ }
+ return pipelineStatus;
+ }
+
private boolean checkNeedRestore(PipelineStatus pipelineStatus) {
return canRestorePipeline() &&
!PipelineStatus.FINISHED.equals(pipelineStatus);
}
@@ -270,23 +281,35 @@ public class SubPlan {
.join();
}
- private void subPlanDone(PipelineStatus pipelineStatus) {
- jobMaster.savePipelineMetricsToHistory(getPipelineLocation());
- jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus);
- jobMaster.releasePipelineResource(this);
- notifyCheckpointManagerPipelineEnd(pipelineStatus);
+ private void subPlanDone(PipelineStatus pipelineStatus) throws Exception {
+ RetryUtils.retryWithException(
+ () -> {
+
jobMaster.savePipelineMetricsToHistory(getPipelineLocation());
+ jobMaster.removeMetricsContext(getPipelineLocation(),
pipelineStatus);
+ jobMaster.releasePipelineResource(this);
+ notifyCheckpointManagerPipelineEnd(pipelineStatus);
+ return null;
+ },
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ true,
+ exception ->
+ exception instanceof OperationTimeoutException
+ || exception instanceof
HazelcastInstanceNotActiveException
+ || exception instanceof
InterruptedException,
+ Constant.OPERATION_RETRY_SLEEP));
}
public boolean canRestorePipeline() {
return jobMaster.isNeedRestore() && getPipelineRestoreNum() <
PIPELINE_MAX_RESTORE_NUM;
}
- private void turnToEndState(@NonNull PipelineStatus endState) {
+ private void turnToEndState(@NonNull PipelineStatus endState) throws
Exception {
synchronized (this) {
// consistency check
- PipelineStatus current = (PipelineStatus)
runningJobStateIMap.get(pipelineLocation);
- if (current.isEndState() && !endState.isEndState()) {
- String message = "Pipeline is trying to leave terminal state "
+ current;
+ if (this.currPipelineStatus.isEndState() &&
!endState.isEndState()) {
+ String message =
+ "Pipeline is trying to leave terminal state " +
this.currPipelineStatus;
LOGGER.severe(message);
throw new IllegalStateException(message);
}
@@ -299,14 +322,27 @@ public class SubPlan {
// we must update runningJobStateTimestampsIMap first and then can
update
// runningJobStateIMap
- updateStateTimestamps(endState);
-
- runningJobStateIMap.set(pipelineLocation, endState);
+ RetryUtils.retryWithException(
+ () -> {
+ updateStateTimestamps(endState);
+ runningJobStateIMap.set(pipelineLocation, endState);
+ return null;
+ },
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ true,
+ exception ->
+ exception instanceof
OperationTimeoutException
+ || exception
+ instanceof
HazelcastInstanceNotActiveException
+ || exception instanceof
InterruptedException,
+ Constant.OPERATION_RETRY_SLEEP));
+ this.currPipelineStatus = endState;
}
}
public boolean updatePipelineState(
- @NonNull PipelineStatus current, @NonNull PipelineStatus
targetState) {
+ @NonNull PipelineStatus current, @NonNull PipelineStatus
targetState) throws Exception {
synchronized (this) {
// consistency check
if (current.isEndState()) {
@@ -345,8 +381,23 @@ public class SubPlan {
// we must update runningJobStateTimestampsIMap first and then
can update
// runningJobStateIMap
- updateStateTimestamps(targetState);
- runningJobStateIMap.set(pipelineLocation, targetState);
+ RetryUtils.retryWithException(
+ () -> {
+ updateStateTimestamps(targetState);
+ runningJobStateIMap.set(pipelineLocation,
targetState);
+ return null;
+ },
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ true,
+ exception ->
+ exception instanceof
OperationTimeoutException
+ || exception
+ instanceof
+
HazelcastInstanceNotActiveException
+ || exception instanceof
InterruptedException,
+ Constant.OPERATION_RETRY_SLEEP));
+ this.currPipelineStatus = targetState;
return true;
} else {
return false;
@@ -436,15 +487,11 @@ public class SubPlan {
},
executorService);
}
- LOGGER.info(
- String.format(
- "can not cancel task %s because it is in state %s ",
- task.getTaskFullName(), task.getExecutionState()));
return null;
}
/** Before restore a pipeline, the pipeline must do reset */
- private synchronized void reset() {
+ private synchronized void reset() throws Exception {
resetPipelineState();
finishedTaskNum.set(0);
canceledTaskNum.set(0);
@@ -463,23 +510,41 @@ public class SubPlan {
runningJobStateTimestampsIMap.set(pipelineLocation, stateTimestamps);
}
- private void resetPipelineState() {
- PipelineStatus pipelineState = getPipelineState();
- if (!pipelineState.isEndState()) {
- String message =
- String.format(
- "%s reset state failed, only end state can be
reset, current is %s",
- getPipelineFullName(), pipelineState);
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
+ private void resetPipelineState() throws Exception {
+ RetryUtils.retryWithException(
+ () -> {
+ PipelineStatus pipelineState = getPipelineState();
+ if (!pipelineState.isEndState()) {
+ String message =
+ String.format(
+ "%s reset state failed, only end state
can be reset, current is %s",
+ getPipelineFullName(), pipelineState);
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
- updateStateTimestamps(PipelineStatus.CREATED);
- runningJobStateIMap.set(pipelineLocation, PipelineStatus.CREATED);
+ updateStateTimestamps(PipelineStatus.CREATED);
+ runningJobStateIMap.set(pipelineLocation,
PipelineStatus.CREATED);
+ this.currPipelineStatus = PipelineStatus.CREATED;
+ ;
+ return null;
+ },
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ true,
+ exception ->
+ exception instanceof OperationTimeoutException
+ || exception instanceof
HazelcastInstanceNotActiveException
+ || exception instanceof
InterruptedException,
+ Constant.OPERATION_RETRY_SLEEP));
}
- /** restore the pipeline when pipeline failed or canceled by error. */
- public void restorePipeline() {
+ /**
+ * reset the pipeline and task state and init state future again
+ *
+ * @return
+ */
+ private boolean prepareRestorePipeline() {
synchronized (restoreLock) {
try {
pipelineRestoreNum++;
@@ -497,9 +562,24 @@ public class SubPlan {
}
reset();
jobMaster.getPhysicalPlan().addPipelineEndCallback(this);
+ return true;
+ } catch (Throwable e) {
+ if (this.currPipelineStatus.isEndState()) {
+ // restore failed
+ return false;
+ }
+ jobMaster.getPhysicalPlan().addPipelineEndCallback(this);
+ return true;
+ }
+ }
+ }
+
+ /** restore the pipeline when pipeline failed or canceled by error. */
+ public void restorePipeline() {
+ synchronized (restoreLock) {
+ try {
if
(jobMaster.getCheckpointManager().isCompletedPipeline(pipelineId)) {
forcePipelineFinish();
- return;
}
jobMaster.getCheckpointManager().reportedPipelineRunning(pipelineId, false);
reSchedulerPipelineFuture =
jobMaster.reSchedulerPipeline(this);
@@ -560,7 +640,7 @@ public class SubPlan {
}
public PipelineStatus getPipelineState() {
- return (PipelineStatus) runningJobStateIMap.get(pipelineLocation);
+ return this.currPipelineStatus;
}
public PipelineLocation getPipelineLocation() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index fc6aff87f..ab5ad8f82 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -291,8 +291,19 @@ public class PipelineBaseScheduler implements JobScheduler
{
private void deployPipeline(
@NonNull SubPlan pipeline, Map<TaskGroupLocation, SlotProfile>
slotProfiles) {
- if (pipeline.updatePipelineState(PipelineStatus.SCHEDULED,
PipelineStatus.DEPLOYING)) {
-
+ boolean changeStateSuccess = false;
+ try {
+ changeStateSuccess =
+ pipeline.updatePipelineState(
+ PipelineStatus.SCHEDULED,
PipelineStatus.DEPLOYING);
+ } catch (Exception e) {
+ log.warn(
+ "{} turn to state {} failed, cancel pipeline",
+ pipeline.getPipelineFullName(),
+ PipelineStatus.DEPLOYING);
+ pipeline.cancelPipeline();
+ }
+ if (changeStateSuccess) {
try {
List<CompletableFuture<?>> deployCoordinatorFuture =
pipeline.getCoordinatorVertexList().stream()