This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ef2775d50 [hotfix][Engine][Checkpoint] Fix Checkpoint can't be
accepted after job retry. (#3704)
ef2775d50 is described below
commit ef2775d50ccbd96011935caaa9b5f0d3e176f134
Author: Hisoka <[email protected]>
AuthorDate: Mon Dec 12 18:04:50 2022 +0800
[hotfix][Engine][Checkpoint] Fix Checkpoint can't be accepted after job
retry. (#3704)
---
.../server/checkpoint/CheckpointCoordinator.java | 18 ++++++++++++------
.../server/checkpoint/CheckpointFailureReason.java | 3 ++-
.../engine/server/checkpoint/CheckpointManager.java | 4 ++--
.../seatunnel/engine/server/master/JobMaster.java | 4 ++--
.../seatunnel/engine/server/master/JobMasterTest.java | 4 +++-
5 files changed, 21 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index debb6d777..8e404a571 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -108,6 +108,8 @@ public class CheckpointCoordinator {
private volatile CompletedCheckpoint latestCompletedCheckpoint = null;
+ private volatile CheckpointType latestAcceptedCheckpoint = null;
+
private final CheckpointConfig coordinatorConfig;
private int tolerableFailureCheckpoints;
@@ -119,7 +121,9 @@ public class CheckpointCoordinator {
private final Object lock = new Object();
- /** Flag marking the coordinator as shut down (not accepting any messages
any more). */
+ /**
+ * Flag marking the coordinator as shut down (not accepting any messages
anymore).
+ */
private volatile boolean shutdown;
@Setter
@@ -257,7 +261,6 @@ public class CheckpointCoordinator {
return;
}
} else {
- shutdown = true;
waitingPendingCheckpointDone();
}
CompletableFuture<PendingCheckpoint> pendingCheckpoint =
createPendingCheckpoint(currentTimestamp, checkpointType);
@@ -304,14 +307,16 @@ public class CheckpointCoordinator {
pendingCompletableFuture.thenAcceptAsync(pendingCheckpoint -> {
LOG.info("wait checkpoint completed: " +
pendingCheckpoint.getCheckpointId());
PassiveCompletableFuture<CompletedCheckpoint> completableFuture =
pendingCheckpoint.getCompletableFuture();
- completableFuture.whenCompleteAsync((completedCheckpoint, error)
-> {
+ completableFuture.whenComplete((completedCheckpoint, error) -> {
if (error != null) {
LOG.error("trigger checkpoint failed", error);
+ checkpointManager.handleCheckpointError(pipelineId, new
CheckpointException(CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR));
} else {
try {
completePendingCheckpoint(completedCheckpoint);
} catch (Throwable e) {
LOG.error("complete checkpoint failed", e);
+ checkpointManager.handleCheckpointError(pipelineId,
new CheckpointException(CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR));
}
}
});
@@ -339,7 +344,7 @@ public class CheckpointCoordinator {
if
(pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null &&
!pendingCheckpoint.isFullyAcknowledged()) {
if (tolerableFailureCheckpoints-- <= 0) {
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_EXPIRED);
-
checkpointManager.handleCheckpointTimeout(pipelineId);
+
checkpointManager.handleCheckpointError(pipelineId, new
CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
}
}
}, coordinatorConfig.getCheckpointTimeout(),
@@ -474,8 +479,8 @@ public class CheckpointCoordinator {
public void completePendingCheckpoint(CompletedCheckpoint
completedCheckpoint) {
LOG.debug("pending checkpoint({}/{}@{}) completed! cost: {}, trigger:
{}, completed: {}",
- completedCheckpoint.getCheckpointId(),
completedCheckpoint.getPipelineId(), completedCheckpoint.getJobId(),
- completedCheckpoint.getCompletedTimestamp() -
completedCheckpoint.getCheckpointTimestamp(),
completedCheckpoint.getCheckpointTimestamp(),
completedCheckpoint.getCompletedTimestamp());
+ completedCheckpoint.getCheckpointId(),
completedCheckpoint.getPipelineId(), completedCheckpoint.getJobId(),
+ completedCheckpoint.getCompletedTimestamp() -
completedCheckpoint.getCheckpointTimestamp(),
completedCheckpoint.getCheckpointTimestamp(),
completedCheckpoint.getCompletedTimestamp());
pendingCounter.decrementAndGet();
final long checkpointId = completedCheckpoint.getCheckpointId();
pendingCheckpoints.remove(checkpointId);
@@ -509,6 +514,7 @@ public class CheckpointCoordinator {
// TODO: notifyCheckpointCompleted fail
latestCompletedCheckpoint = completedCheckpoint;
if (isCompleted()) {
+ shutdown = true;
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_COORDINATOR_COMPLETED);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
index 703e6e1d7..ccd121f74 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
@@ -22,7 +22,8 @@ public enum CheckpointFailureReason {
PIPELINE_END("Pipeline turn to end state."),
CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
- CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown.");
+ CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
+ CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error.");
private final String message;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 704bbd740..9733e2f40 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -140,8 +140,8 @@ public class CheckpointManager {
getCheckpointCoordinator(pipelineId).tryTriggerPendingCheckpoint();
}
- protected void handleCheckpointTimeout(int pipelineId) {
- jobMaster.handleCheckpointTimeout(pipelineId);
+ protected void handleCheckpointError(int pipelineId, Throwable e) {
+ jobMaster.handleCheckpointError(pipelineId, e);
}
private CheckpointCoordinator getCheckpointCoordinator(TaskLocation
taskLocation) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index b0b91d1bc..9ed17159f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -237,11 +237,11 @@ public class JobMaster extends Thread {
}
}
- public void handleCheckpointTimeout(long pipelineId) {
+ public void handleCheckpointError(long pipelineId, Throwable e) {
this.physicalPlan.getPipelineList().forEach(pipeline -> {
if (pipeline.getPipelineLocation().getPipelineId() == pipelineId) {
LOGGER.warning(
- String.format("%s checkpoint timeout, cancel the
pipeline", pipeline.getPipelineFullName()));
+ String.format("%s checkpoint have error, cancel the
pipeline", pipeline.getPipelineFullName()), e);
pipeline.cancelPipeline();
}
});
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 606d1b9a0..a7ba48ce6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -28,6 +28,8 @@ import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.TestUtils;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointException;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointFailureReason;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -121,7 +123,7 @@ public class JobMasterTest extends
AbstractSeaTunnelServerTest {
.untilAsserted(() -> Assertions.assertEquals(JobStatus.RUNNING,
jobMaster.getJobStatus()));
// call checkpoint timeout
- jobMaster.handleCheckpointTimeout(1);
+ jobMaster.handleCheckpointError(1, new
CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
// Because handleCheckpointTimeout is an async method, so we need
sleep 5s to waiting job status become running again
Thread.sleep(5000);