This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ef2775d50 [hotfix][Engine][Checkpoint] Fix Checkpoint can't be 
accepted after job retry. (#3704)
ef2775d50 is described below

commit ef2775d50ccbd96011935caaa9b5f0d3e176f134
Author: Hisoka <[email protected]>
AuthorDate: Mon Dec 12 18:04:50 2022 +0800

    [hotfix][Engine][Checkpoint] Fix Checkpoint can't be accepted after job 
retry. (#3704)
---
 .../server/checkpoint/CheckpointCoordinator.java       | 18 ++++++++++++------
 .../server/checkpoint/CheckpointFailureReason.java     |  3 ++-
 .../engine/server/checkpoint/CheckpointManager.java    |  4 ++--
 .../seatunnel/engine/server/master/JobMaster.java      |  4 ++--
 .../seatunnel/engine/server/master/JobMasterTest.java  |  4 +++-
 5 files changed, 21 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index debb6d777..8e404a571 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -108,6 +108,8 @@ public class CheckpointCoordinator {
 
     private volatile CompletedCheckpoint latestCompletedCheckpoint = null;
 
+    private volatile CheckpointType latestAcceptedCheckpoint = null;
+
     private final CheckpointConfig coordinatorConfig;
 
     private int tolerableFailureCheckpoints;
@@ -119,7 +121,9 @@ public class CheckpointCoordinator {
 
     private final Object lock = new Object();
 
-    /** Flag marking the coordinator as shut down (not accepting any messages 
any more). */
+    /**
+     * Flag marking the coordinator as shut down (not accepting any messages 
anymore).
+     */
     private volatile boolean shutdown;
 
     @Setter
@@ -257,7 +261,6 @@ public class CheckpointCoordinator {
                     return;
                 }
             } else {
-                shutdown = true;
                 waitingPendingCheckpointDone();
             }
             CompletableFuture<PendingCheckpoint> pendingCheckpoint = 
createPendingCheckpoint(currentTimestamp, checkpointType);
@@ -304,14 +307,16 @@ public class CheckpointCoordinator {
         pendingCompletableFuture.thenAcceptAsync(pendingCheckpoint -> {
             LOG.info("wait checkpoint completed: " + 
pendingCheckpoint.getCheckpointId());
             PassiveCompletableFuture<CompletedCheckpoint> completableFuture = 
pendingCheckpoint.getCompletableFuture();
-            completableFuture.whenCompleteAsync((completedCheckpoint, error) 
-> {
+            completableFuture.whenComplete((completedCheckpoint, error) -> {
                 if (error != null) {
                     LOG.error("trigger checkpoint failed", error);
+                    checkpointManager.handleCheckpointError(pipelineId, new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR));
                 } else {
                     try {
                         completePendingCheckpoint(completedCheckpoint);
                     } catch (Throwable e) {
                         LOG.error("complete checkpoint failed", e);
+                        checkpointManager.handleCheckpointError(pipelineId, 
new CheckpointException(CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR));
                     }
                 }
             });
@@ -339,7 +344,7 @@ public class CheckpointCoordinator {
                     if 
(pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null && 
!pendingCheckpoint.isFullyAcknowledged()) {
                         if (tolerableFailureCheckpoints-- <= 0) {
                             
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_EXPIRED);
-                            
checkpointManager.handleCheckpointTimeout(pipelineId);
+                            
checkpointManager.handleCheckpointError(pipelineId, new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
                         }
                     }
                 }, coordinatorConfig.getCheckpointTimeout(),
@@ -474,8 +479,8 @@ public class CheckpointCoordinator {
 
     public void completePendingCheckpoint(CompletedCheckpoint 
completedCheckpoint) {
         LOG.debug("pending checkpoint({}/{}@{}) completed! cost: {}, trigger: 
{}, completed: {}",
-                completedCheckpoint.getCheckpointId(), 
completedCheckpoint.getPipelineId(), completedCheckpoint.getJobId(),
-                completedCheckpoint.getCompletedTimestamp() - 
completedCheckpoint.getCheckpointTimestamp(), 
completedCheckpoint.getCheckpointTimestamp(), 
completedCheckpoint.getCompletedTimestamp());
+            completedCheckpoint.getCheckpointId(), 
completedCheckpoint.getPipelineId(), completedCheckpoint.getJobId(),
+            completedCheckpoint.getCompletedTimestamp() - 
completedCheckpoint.getCheckpointTimestamp(), 
completedCheckpoint.getCheckpointTimestamp(), 
completedCheckpoint.getCompletedTimestamp());
         pendingCounter.decrementAndGet();
         final long checkpointId = completedCheckpoint.getCheckpointId();
         pendingCheckpoints.remove(checkpointId);
@@ -509,6 +514,7 @@ public class CheckpointCoordinator {
         // TODO: notifyCheckpointCompleted fail
         latestCompletedCheckpoint = completedCheckpoint;
         if (isCompleted()) {
+            shutdown = true;
             
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_COORDINATOR_COMPLETED);
         }
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
index 703e6e1d7..ccd121f74 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
@@ -22,7 +22,8 @@ public enum CheckpointFailureReason {
     PIPELINE_END("Pipeline turn to end state."),
     CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
     CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
-    CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown.");
+    CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
+    CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error.");
 
     private final String message;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 704bbd740..9733e2f40 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -140,8 +140,8 @@ public class CheckpointManager {
         getCheckpointCoordinator(pipelineId).tryTriggerPendingCheckpoint();
     }
 
-    protected void handleCheckpointTimeout(int pipelineId) {
-        jobMaster.handleCheckpointTimeout(pipelineId);
+    protected void handleCheckpointError(int pipelineId, Throwable e) {
+        jobMaster.handleCheckpointError(pipelineId, e);
     }
 
     private CheckpointCoordinator getCheckpointCoordinator(TaskLocation 
taskLocation) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index b0b91d1bc..9ed17159f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -237,11 +237,11 @@ public class JobMaster extends Thread {
         }
     }
 
-    public void handleCheckpointTimeout(long pipelineId) {
+    public void handleCheckpointError(long pipelineId, Throwable e) {
         this.physicalPlan.getPipelineList().forEach(pipeline -> {
             if (pipeline.getPipelineLocation().getPipelineId() == pipelineId) {
                 LOGGER.warning(
-                    String.format("%s checkpoint timeout, cancel the 
pipeline", pipeline.getPipelineFullName()));
+                    String.format("%s checkpoint have error, cancel the 
pipeline", pipeline.getPipelineFullName()), e);
                 pipeline.cancelPipeline();
             }
         });
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 606d1b9a0..a7ba48ce6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -28,6 +28,8 @@ import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.TestUtils;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointException;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointFailureReason;
 import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -121,7 +123,7 @@ public class JobMasterTest extends 
AbstractSeaTunnelServerTest {
             .untilAsserted(() -> Assertions.assertEquals(JobStatus.RUNNING, 
jobMaster.getJobStatus()));
 
         // call checkpoint timeout
-        jobMaster.handleCheckpointTimeout(1);
+        jobMaster.handleCheckpointError(1, new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
 
         // Because handleCheckpointTimeout is an async method, so we need 
sleep 5s to waiting job status become running again
         Thread.sleep(5000);

Reply via email to