[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15688077#comment-15688077 ]
ASF GitHub Bot commented on FLINK-3257: --------------------------------------- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Ok, so I am progressing this a bit independently from the termination stuff and then we rebase to the first PR that is merged. I just changed everything and rebased to the current master. Some notable changes: - The `StreamIterationCheckpointingITCase` is not made deterministic, it fails after the first successful checkpoint once and the jobs stops after everything has been recovered appropriately. - I am now using ListState which is supposed to work like a charm with the rocksdb file backend. Note that with the default in-memory backend there is a high chance to get issues given the low memory capacity that it is given by default. - One tricky part that can be potentially done better is the way I set the logger in the StreamIterationHead (had to change the head op field access to `protected` in the OperatorChain) Whenever you find time go ahead and check it out. It passes my super-strict test which is a good thing. :) > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > ------------------------------------------------------------------- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Reporter: Paris Carbone > Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)