[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941700#comment-15941700 ]
ASF GitHub Bot commented on FLINK-3257: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r108034229 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java --- @@ -64,6 +70,22 @@ public void init() throws Exception { super.init(); } + @Override + protected boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { + LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName()); + + synchronized (getCheckpointLock()) { + if (isRunning()) { + dataChannel.put(new Either.Right(new CheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions))); + getEnvironment().acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), checkpointMetrics); --- End diff -- Can the `IterationTailTask` contain operators as well, or is it always a task without operators? If it has operators, we cannot immediately acknowledge here, but need to delegate to superclass checkpoint method instead. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > ------------------------------------------------------------------- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API > Reporter: Paris Carbone > Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)