hk-lrzy commented on code in PR #2620:
URL: 
https://github.com/apache/incubator-seatunnel/pull/2620#discussion_r962860414


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:
##########
@@ -81,33 +91,45 @@ public class CheckpointCoordinator {
      */
     private final Map<Long, Integer> pipelineTasks;
 
+    private final Map<Long, SeaTunnelTaskState> pipelineTaskStatus;
+
     private final CheckpointPlan plan;
 
-    private final Map<Long, PendingCheckpoint> pendingCheckpoints;
+    private final ConcurrentHashMap<Long, PendingCheckpoint> 
pendingCheckpoints;
 
     private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
+    private CompletedCheckpoint latestCompletedCheckpoint;
+
     private final CheckpointCoordinatorConfiguration coordinatorConfig;
 
+    private int tolerableFailureCheckpoints;
     private final transient ScheduledExecutorService scheduler;
 
     private final AtomicLong latestTriggerTimestamp = new AtomicLong(0);
 
+    private final AtomicInteger pendingCounter = new AtomicInteger(0);
+
     private final Object lock = new Object();
+
+    private final Object autoSavepointLock = new Object();
     public CheckpointCoordinator(CheckpointManager manager,
                                  CheckpointStorage checkpointStorage,
                                  CheckpointStorageConfiguration storageConfig,
                                  long jobId,
                                  CheckpointPlan plan,
                                  CheckpointCoordinatorConfiguration 
coordinatorConfig) {
+
         this.checkpointManager = manager;
         this.checkpointStorage = checkpointStorage;
         this.storageConfig = storageConfig;
         this.jobId = jobId;
         this.pipelineId = plan.getPipelineId();
         this.plan = plan;
         this.coordinatorConfig = coordinatorConfig;
-        this.pendingCheckpoints = new LinkedHashMap<>();
+        this.latestCompletedCheckpoint = plan.getRestoredCheckpoint();

Review Comment:
   `latestCompletedCheckpoint` always null in beginning?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:
##########
@@ -146,9 +227,47 @@ public static Map<Long, Integer> 
getPipelineTasks(Set<TaskLocation> pipelineSubt
             .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
entry.getValue().size()));
     }
 
-    public void startTriggerPendingCheckpoint(long triggerTimestamp) {
-        CompletableFuture<PendingCheckpoint> completableFuture = new 
CompletableFuture<>();
-        CompletableFuture.supplyAsync(() -> {
+    public PassiveCompletableFuture<PendingCheckpoint> startSavepoint() {
+        CompletableFuture<PendingCheckpoint> savepoint = 
createPendingCheckpoint(Instant.now().toEpochMilli(), 
CheckpointType.SAVEPOINT_TYPE);
+        startTriggerPendingCheckpoint(savepoint);
+        return new PassiveCompletableFuture<>(savepoint);
+    }
+
+    private void 
startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint> 
pendingCompletableFuture) {
+        // Trigger the barrier and wait for all tasks to ACK
+        pendingCompletableFuture.thenAcceptAsync(pendingCheckpoint -> {
+            if (CheckpointType.AUTO_SAVEPOINT_TYPE != 
pendingCheckpoint.getCheckpointType()) {
+                LOG.debug("trigger checkpoint barrier" + pendingCheckpoint);
+                CompletableFuture.supplyAsync(() ->
+                        new 
CheckpointBarrier(pendingCheckpoint.getCheckpointId(),
+                            pendingCheckpoint.getCheckpointTimestamp(),
+                            pendingCheckpoint.getCheckpointType()))
+                    .thenApplyAsync(this::triggerCheckpoint)
+                    .thenApplyAsync(invocationFutures -> 
CompletableFuture.allOf(invocationFutures).join());
+            }
+            LOG.debug("wait checkpoint completed: " + pendingCheckpoint);
+            pendingCheckpoint.getCompletableFuture()
+                .thenAcceptAsync(this::completePendingCheckpoint);

Review Comment:
   if one of future have any exception , should there have a discard pipeline?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to