[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941700#comment-15941700
 ] 

ASF GitHub Bot commented on FLINK-3257:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1668#discussion_r108034229
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 ---
    @@ -64,6 +70,22 @@ public void init() throws Exception {
                super.init();
        }
     
    +   @Override
    +   protected boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
    +           LOG.debug("Starting checkpoint {} on task {}", 
checkpointMetaData.getCheckpointId(), getName());
    +
    +           synchronized (getCheckpointLock()) {
    +                   if (isRunning()) {
    +                           dataChannel.put(new Either.Right(new 
CheckpointBarrier(checkpointMetaData.getCheckpointId(), 
checkpointMetaData.getTimestamp(), checkpointOptions)));
    +                           
getEnvironment().acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), 
checkpointMetrics);
    --- End diff --
    
    Can the `IterationTailTask` contain operators as well, or is it always a 
task without operators? If it has operators, we cannot immediately acknowledge 
here, but need to delegate to superclass checkpoint method instead.


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> -------------------------------------------------------------------
>
>                 Key: FLINK-3257
>                 URL: https://issues.apache.org/jira/browse/FLINK-3257
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Paris Carbone
>            Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to