ashulin commented on code in PR #2620:
URL:
https://github.com/apache/incubator-seatunnel/pull/2620#discussion_r963009694
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:
##########
@@ -81,33 +91,45 @@ public class CheckpointCoordinator {
*/
private final Map<Long, Integer> pipelineTasks;
+ private final Map<Long, SeaTunnelTaskState> pipelineTaskStatus;
+
private final CheckpointPlan plan;
- private final Map<Long, PendingCheckpoint> pendingCheckpoints;
+ private final ConcurrentHashMap<Long, PendingCheckpoint>
pendingCheckpoints;
private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
+ private CompletedCheckpoint latestCompletedCheckpoint;
+
private final CheckpointCoordinatorConfiguration coordinatorConfig;
+ private int tolerableFailureCheckpoints;
private final transient ScheduledExecutorService scheduler;
private final AtomicLong latestTriggerTimestamp = new AtomicLong(0);
+ private final AtomicInteger pendingCounter = new AtomicInteger(0);
+
private final Object lock = new Object();
+
+ private final Object autoSavepointLock = new Object();
public CheckpointCoordinator(CheckpointManager manager,
CheckpointStorage checkpointStorage,
CheckpointStorageConfiguration storageConfig,
long jobId,
CheckpointPlan plan,
CheckpointCoordinatorConfiguration
coordinatorConfig) {
+
this.checkpointManager = manager;
this.checkpointStorage = checkpointStorage;
this.storageConfig = storageConfig;
this.jobId = jobId;
this.pipelineId = plan.getPipelineId();
this.plan = plan;
this.coordinatorConfig = coordinatorConfig;
- this.pendingCheckpoints = new LinkedHashMap<>();
+ this.latestCompletedCheckpoint = plan.getRestoredCheckpoint();
Review Comment:
This part requires a change to the current checkpoint storage, which will be
done in another PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]