ashulin commented on code in PR #2620:
URL:
https://github.com/apache/incubator-seatunnel/pull/2620#discussion_r962743098
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:
##########
@@ -200,33 +326,62 @@ private Map<Long, TaskStatistics> getTaskStatistics() {
}
public InvocationFuture<?>[] triggerCheckpoint(CheckpointBarrier
checkpointBarrier) {
+ // TODO: some tasks have completed and don't need to trigger
return plan.getStartingSubtasks()
.stream()
- .map(taskLocation -> new
CheckpointTriggerOperation(checkpointBarrier, taskLocation))
- .map(checkpointManager::triggerCheckpoint)
+ .map(taskLocation -> new
CheckpointBarrierTriggerOperation(checkpointBarrier, taskLocation))
+ .map(checkpointManager::sendOperationToMemberNode)
.toArray(InvocationFuture[]::new);
}
+ protected void cleanPendingCheckpoint() {
+ // TODO: clear related future & scheduler task
+ pendingCheckpoints.clear();
+ }
+
protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
- final long checkpointId = ackOperation.getCheckpointId();
+ final long checkpointId = ackOperation.getBarrier().getId();
final PendingCheckpoint pendingCheckpoint =
pendingCheckpoints.get(checkpointId);
- if (pendingCheckpoint == null) {
- LOG.debug("job: {}, pipeline: {}, the checkpoint({}) don't
exist.", jobId, pipelineId, checkpointId);
+ TaskLocation location = ackOperation.getTaskLocation();
+ LOG.debug("task[{}]({}/{}) ack. {}", location.getTaskID(),
location.getPipelineId(), location.getJobId(),
ackOperation.getBarrier().toString());
+ if (checkpointId == Barrier.PREPARE_CLOSE_BARRIER_ID) {
+ synchronized (autoSavepointLock) {
+ if (pendingCheckpoints.get(checkpointId) == null) {
+ CompletableFuture<PendingCheckpoint> future =
createPendingCheckpoint(
+ Instant.now().toEpochMilli(),
+
CompletableFuture.completedFuture(Barrier.PREPARE_CLOSE_BARRIER_ID),
+ CheckpointType.AUTO_SAVEPOINT_TYPE);
+ startTriggerPendingCheckpoint(future);
+ future.join();
+ }
+ }
+ pendingCheckpoints.values().parallelStream()
+ .forEach(cp ->
cp.acknowledgeTask(ackOperation.getTaskLocation(), ackOperation.getStates(),
SubtaskStatus.AUTO_PREPARE_CLOSE));
+ return;
+ } else if (pendingCheckpoint == null) {
+ LOG.info("job: {}, pipeline: {}, the checkpoint({}) don't exist.",
jobId, pipelineId, checkpointId);
return;
}
- pendingCheckpoint.acknowledgeTask(ackOperation.getTaskLocation(),
ackOperation.getStates());
+
+ pendingCheckpoint.acknowledgeTask(location, ackOperation.getStates(),
+ CheckpointType.SAVEPOINT_TYPE ==
pendingCheckpoint.getCheckpointType() ?
+ SubtaskStatus.SAVEPOINT_PREPARE_CLOSE :
+ SubtaskStatus.RUNNING);
}
public void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
{
+ LOG.info("pending checkpoint({}/{}@{}) completed!",
pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getPipelineId(),
pendingCheckpoint.getJobId());
+ pendingCounter.decrementAndGet();
final long checkpointId = pendingCheckpoint.getCheckpointId();
- InvocationFuture<?>[] invocationFutures =
notifyCheckpointCompleted(pendingCheckpoint.getCheckpointId());
+ InvocationFuture<?>[] invocationFutures =
notifyCheckpointCompleted(checkpointId);
CompletableFuture.allOf(invocationFutures).join();
CompletedCheckpoint completedCheckpoint =
pendingCheckpoint.toCompletedCheckpoint();
pendingCheckpoints.remove(checkpointId);
if (pendingCheckpoints.size() + 1 ==
coordinatorConfig.getMaxConcurrentCheckpoints()) {
// latest checkpoint completed time > checkpoint interval
tryTriggerPendingCheckpoint();
}
+ latestCompletedCheckpoint = completedCheckpoint;
completedCheckpoints.addLast(completedCheckpoint);
try {
byte[] states = serializer.serialize(completedCheckpoint);
Review Comment:
Thanks, I got it. If the notification is successful, but the
CompletedCheckpoint save fails, it will cause the sink of 2PC to be repeatedly
written;
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]