ic4y commented on code in PR #2620:
URL: 
https://github.com/apache/incubator-seatunnel/pull/2620#discussion_r962679538


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:
##########
@@ -200,33 +326,62 @@ private Map<Long, TaskStatistics> getTaskStatistics() {
     }
 
     public InvocationFuture<?>[] triggerCheckpoint(CheckpointBarrier 
checkpointBarrier) {
+        // TODO: some tasks have completed and don't need to trigger
         return plan.getStartingSubtasks()
             .stream()
-            .map(taskLocation -> new 
CheckpointTriggerOperation(checkpointBarrier, taskLocation))
-            .map(checkpointManager::triggerCheckpoint)
+            .map(taskLocation -> new 
CheckpointBarrierTriggerOperation(checkpointBarrier, taskLocation))
+            .map(checkpointManager::sendOperationToMemberNode)
             .toArray(InvocationFuture[]::new);
     }
 
+    protected void cleanPendingCheckpoint() {
+        // TODO: clear related future & scheduler task
+        pendingCheckpoints.clear();
+    }
+
     protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
-        final long checkpointId = ackOperation.getCheckpointId();
+        final long checkpointId = ackOperation.getBarrier().getId();
         final PendingCheckpoint pendingCheckpoint = 
pendingCheckpoints.get(checkpointId);
-        if (pendingCheckpoint == null) {
-            LOG.debug("job: {}, pipeline: {}, the checkpoint({}) don't 
exist.", jobId, pipelineId, checkpointId);
+        TaskLocation location = ackOperation.getTaskLocation();
+        LOG.debug("task[{}]({}/{}) ack. {}", location.getTaskID(), 
location.getPipelineId(), location.getJobId(), 
ackOperation.getBarrier().toString());
+        if (checkpointId == Barrier.PREPARE_CLOSE_BARRIER_ID) {
+            synchronized (autoSavepointLock) {
+                if (pendingCheckpoints.get(checkpointId) == null) {
+                    CompletableFuture<PendingCheckpoint> future = 
createPendingCheckpoint(
+                        Instant.now().toEpochMilli(),
+                        
CompletableFuture.completedFuture(Barrier.PREPARE_CLOSE_BARRIER_ID),
+                        CheckpointType.AUTO_SAVEPOINT_TYPE);
+                    startTriggerPendingCheckpoint(future);
+                    future.join();
+                }
+            }
+            pendingCheckpoints.values().parallelStream()
+                .forEach(cp -> 
cp.acknowledgeTask(ackOperation.getTaskLocation(), ackOperation.getStates(), 
SubtaskStatus.AUTO_PREPARE_CLOSE));
+            return;
+        } else if (pendingCheckpoint == null) {
+            LOG.info("job: {}, pipeline: {}, the checkpoint({}) don't exist.", 
jobId, pipelineId, checkpointId);
             return;
         }
-        pendingCheckpoint.acknowledgeTask(ackOperation.getTaskLocation(), 
ackOperation.getStates());
+
+        pendingCheckpoint.acknowledgeTask(location, ackOperation.getStates(),
+            CheckpointType.SAVEPOINT_TYPE == 
pendingCheckpoint.getCheckpointType() ?
+                SubtaskStatus.SAVEPOINT_PREPARE_CLOSE :
+                SubtaskStatus.RUNNING);
     }
 
     public void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) 
{
+        LOG.info("pending checkpoint({}/{}@{}) completed!", 
pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getPipelineId(), 
pendingCheckpoint.getJobId());
+        pendingCounter.decrementAndGet();
         final long checkpointId = pendingCheckpoint.getCheckpointId();
-        InvocationFuture<?>[] invocationFutures = 
notifyCheckpointCompleted(pendingCheckpoint.getCheckpointId());
+        InvocationFuture<?>[] invocationFutures = 
notifyCheckpointCompleted(checkpointId);
         CompletableFuture.allOf(invocationFutures).join();
         CompletedCheckpoint completedCheckpoint = 
pendingCheckpoint.toCompletedCheckpoint();
         pendingCheckpoints.remove(checkpointId);
         if (pendingCheckpoints.size() + 1 == 
coordinatorConfig.getMaxConcurrentCheckpoints()) {
             // latest checkpoint completed time > checkpoint interval
             tryTriggerPendingCheckpoint();
         }
+        latestCompletedCheckpoint = completedCheckpoint;
         completedCheckpoints.addLast(completedCheckpoint);
         try {
             byte[] states = serializer.serialize(completedCheckpoint);

Review Comment:
   > This place first called back task the checkpoint successfully, and then 
writes the checkpoint state to the file system. If the callback Task checkpoint 
succeeds, but the node fails without writing to the file system. Will 
restarting the recovery at this time cause data duplication?
   
   This is when the connector supports exactly-once. I think of a distributed 
snapshot as a two-phase commit. The final commit is based on the persistent 
state. That is to say, the checkpoint state should be written to the file 
system first, and then call back task the checkpoint successfully



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to