[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15943135#comment-15943135
 ] 

ASF GitHub Bot commented on FLINK-3257:
---------------------------------------

Github user senorcarbone commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1668#discussion_r108152267
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 ---
    @@ -64,6 +70,22 @@ public void init() throws Exception {
                super.init();
        }
     
    +   @Override
    +   protected boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
    +           LOG.debug("Starting checkpoint {} on task {}", 
checkpointMetaData.getCheckpointId(), getName());
    +
    +           synchronized (getCheckpointLock()) {
    +                   if (isRunning()) {
    +                           dataChannel.put(new Either.Right(new 
CheckpointBarrier(checkpointMetaData.getCheckpointId(), 
checkpointMetaData.getTimestamp(), checkpointOptions)));
    +                           
getEnvironment().acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), 
checkpointMetrics);
    --- End diff --
    
    Hm, not really. I cannot think of a possible usage of loaded operators in 
IterationSink tasks in the future too. 


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> -------------------------------------------------------------------
>
>                 Key: FLINK-3257
>                 URL: https://issues.apache.org/jira/browse/FLINK-3257
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Paris Carbone
>            Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to