[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17323732#comment-17323732 ] Flink Jira Bot commented on FLINK-3257: --- This issue is assigned but has not received an update in 7 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Paris Carbone >Assignee: Paris Carbone >Priority: Major > Labels: pull-request-available, stale-assigned > Time Spent: 10m > Remaining Estimate: 0h > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16706487#comment-16706487 ] ASF GitHub Bot commented on FLINK-3257: --- tzanko-matev commented on issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for Iterative DataStream Jobs URL: https://github.com/apache/flink/pull/1668#issuecomment-443545153 Hi, are there any updates on this feature? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone >Priority: Major > Labels: pull-request-available > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16278282#comment-16278282 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 well..major runtime changes are coming with FLIP-6, 15 and 16 so I would suggest you watch those. Loop FT will be included in one of these along with other loop redesign features. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16266364#comment-16266364 ] ASF GitHub Bot commented on FLINK-3257: --- Github user dikei commented on the issue: https://github.com/apache/flink/pull/1668 Hi. Do we have any updates on this :) > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15943328#comment-15943328 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 sweet! thanks @StefanRRichter > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15943325#comment-15943325 ] ASF GitHub Bot commented on FLINK-3257: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/1668 For raw operator state, override `AbstractStreamOperator::snapshotState(StateSnapshotContext context)` inside your operator. Your implementation calls to super, then it can obtain the raw stream via `context.getRawOperatorStateOutput()`. This stream works like a normal output stream, except that you can also call `stream.startNewPartition()`. This signals that a partition is started and previous partitions are finalized/immutable. Partitions are the atomic units of state redistribution, think of them as the indiviual elements in a `ListCheckpointed` state. For restoring, override `AbstractStreamOperator::initializeState(StateInitializationContext context)`. After calling super, `context.getRawOperatorStateInputs()` provides an iterable with one input stream per partition that your operator should restore. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15943245#comment-15943245 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r108166093 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -17,100 +17,164 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.LinkedList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * TODO write javadoc + * + * - open a list state per snapshot process + * - book-keep snapshot logs + * - Clean up state when a savepoint is complete - ONLY in-transit records who do NOT belong in other snapshots + * + * @param + */ @Internal -public class StreamIterationHead extends OneInputStreamTask { +public class StreamIterationHead extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); private volatile boolean running = true; - // - + private volatile RecordWriterOutput[] outputs; + + private UpstreamLogger upstreamLogger; + + private Object lock; + + @Override + public void init() throws Exception { + this.lock = getCheckpointLock(); + getConfiguration().setStreamOperator(new UpstreamLogger(getConfiguration())); + operatorChain = new OperatorChain<>(this); + this.upstreamLogger = (UpstreamLogger) operatorChain.getHeadOperator(); --- End diff -- it's another instance that is why I was fetching it back there. The OperatorChain basically deserialises and sets up another instance through the configuration. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the us
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15943135#comment-15943135 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r108152267 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java --- @@ -64,6 +70,22 @@ public void init() throws Exception { super.init(); } + @Override + protected boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { + LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName()); + + synchronized (getCheckpointLock()) { + if (isRunning()) { + dataChannel.put(new Either.Right(new CheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions))); + getEnvironment().acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), checkpointMetrics); --- End diff -- Hm, not really. I cannot think of a possible usage of loaded operators in IterationSink tasks in the future too. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15943105#comment-15943105 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Thanks for the review @gyfora and @StephanEwen , these are very good points. @StephanEwen makes sense to not really index/keep metadata of individual records in log slices, it is extra overhead. Writing raw operator state makes sense, so I will do that once @StefanRRichter gives me some pointers, that would be great. Any redistribution of the checkpoint slices would violate causality so I hope the "list redistribution pattern" actually keeps the set of registered operator states per instance intact. The garbage collection issue still remains but maybe (if @StefanRRichter approves) I can add an `unregister` functionality to the `OperatorStateStore`. I can also add preconfigured operators (not that they will be reused anywhere). It is more clean but I really need to see how can I get full control of the `task` checkpointing behaviour from the `operator` level (since the default task checkpointing behaviour is altered at the task-level). > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941700#comment-15941700 ] ASF GitHub Bot commented on FLINK-3257: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r108034229 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java --- @@ -64,6 +70,22 @@ public void init() throws Exception { super.init(); } + @Override + protected boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { + LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName()); + + synchronized (getCheckpointLock()) { + if (isRunning()) { + dataChannel.put(new Either.Right(new CheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions))); + getEnvironment().acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), checkpointMetrics); --- End diff -- Can the `IterationTailTask` contain operators as well, or is it always a task without operators? If it has operators, we cannot immediately acknowledge here, but need to delegate to superclass checkpoint method instead. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940873#comment-15940873 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107967886 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -119,13 +183,154 @@ protected void cleanup() throws Exception { * Creates the identification string with which head and tail task find the shared blocking * queue for the back channel. The identification string is unique per parallel head/tail pair * per iteration per job. -* -* @param jid The job ID. -* @param iterationID The id of the iteration in the job. +* +* @param jid The job ID. +* @param iterationID The id of the iteration in the job. * @param subtaskIndex The parallel subtask number * @return The identification string. */ public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { return jid + "-" + iterationID + "-" + subtaskIndex; } + + /** +* An internal operator that solely serves as a state logging facility for persisting, +* partitioning and restoring output logs for dataflow cycles consistently. To support concurrency, +* logs are being sliced proportionally to the number of concurrent snapshots. This allows committed +* output logs to be uniquely identified and cleared after each complete checkpoint. +* +* The design is based on the following assumptions: +* +* - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. +* - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. +* - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that +* gives a singular view of the log. +* +* TODO it seems that ListState.clear does not unregister state. We need to put a hook for that. +* +* @param +*/ + public static class UpstreamLogger extends AbstractStreamOperator implements OneInputStreamOperator { + + private final StreamConfig config; + + private LinkedList>> slicedLog = new LinkedList<>(); + + private UpstreamLogger(StreamConfig config) { + this.config = config; + } + + public void logRecord(StreamRecord record) throws Exception { + if (!slicedLog.isEmpty()) { + slicedLog.getLast().add(record); + } + } + + public void createSlice(String sliceID) throws Exception { + ListState> nextSlice = + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID, + config.>getTypeSerializerOut(getUserCodeClassloader(; + slicedLog.addLast(nextSlice); + } + + public void discardSlice() { + ListState> logToEvict = slicedLog.pollFirst(); + logToEvict.clear(); + } + + public Iterable> getReplayLog() throws Exception { + final List logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames()); + Collections.sort(logSlices, new Comparator() { + @Override + public int compare(String o1, String o2) { + return Long.valueOf(o1).compareTo(Long.valueOf(o2)); + } + }); + + final List>> wrappedIterators = new ArrayList<>(); + for (String splitID : logSlices) { + wrappedIterators.add(getOperatorStateBackend() + .getOperatorState(new ListStateDescriptor<>(splitID, + config.>getTypeSerializerOut(getUserCodeClassloader(.get().iterator()); + } + + if (wrappedIterators.size() == 0) { + return new Iterable>() { + @Override + public Iterator> iterator() { + return Collections.emptyListIterator(); +
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940878#comment-15940878 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107969496 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -17,100 +17,164 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.LinkedList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * TODO write javadoc + * + * - open a list state per snapshot process + * - book-keep snapshot logs + * - Clean up state when a savepoint is complete - ONLY in-transit records who do NOT belong in other snapshots + * + * @param + */ @Internal -public class StreamIterationHead extends OneInputStreamTask { +public class StreamIterationHead extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); private volatile boolean running = true; - // - + private volatile RecordWriterOutput[] outputs; + + private UpstreamLogger upstreamLogger; + + private Object lock; + + @Override + public void init() throws Exception { + this.lock = getCheckpointLock(); + getConfiguration().setStreamOperator(new UpstreamLogger(getConfiguration())); + operatorChain = new OperatorChain<>(this); + this.upstreamLogger = (UpstreamLogger) operatorChain.getHeadOperator(); + } + @Override protected void run() throws Exception { - + final String iterationId = getConfiguration().getIterationId(); if (iterationId == null || iterationId.length() == 0) { throw new Exception("Missing iteration ID in the task configuration"); } - - final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId , - getEnvironment().getTaskInfo().getIndexOfThisSubtask()); - + final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId, + getEnvironment().getTaskInfo().getIndexOfThisSubtask()); final long iterationWaitTime = getConfiguration().getIterationWaitTime(); final boolean shouldWait = iterationWaitTime > 0; - final BlockingQueue> dataChannel = new ArrayBlockingQueue>(1); + final BlockingQueue, CheckpointBarrier>> dataChannel + = new ArrayBlockingQueue<>(1); // offer the queue for the tail BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel); LOG.info("Iteration head {} added feedback queue under {}", getName(), brokerID); // do the work try { - @Suppres
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940877#comment-15940877 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107967910 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -119,13 +183,154 @@ protected void cleanup() throws Exception { * Creates the identification string with which head and tail task find the shared blocking * queue for the back channel. The identification string is unique per parallel head/tail pair * per iteration per job. -* -* @param jid The job ID. -* @param iterationID The id of the iteration in the job. +* +* @param jid The job ID. +* @param iterationID The id of the iteration in the job. * @param subtaskIndex The parallel subtask number * @return The identification string. */ public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { return jid + "-" + iterationID + "-" + subtaskIndex; } + + /** +* An internal operator that solely serves as a state logging facility for persisting, +* partitioning and restoring output logs for dataflow cycles consistently. To support concurrency, +* logs are being sliced proportionally to the number of concurrent snapshots. This allows committed +* output logs to be uniquely identified and cleared after each complete checkpoint. +* +* The design is based on the following assumptions: +* +* - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. +* - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. +* - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that +* gives a singular view of the log. +* +* TODO it seems that ListState.clear does not unregister state. We need to put a hook for that. +* +* @param +*/ + public static class UpstreamLogger extends AbstractStreamOperator implements OneInputStreamOperator { + + private final StreamConfig config; + + private LinkedList>> slicedLog = new LinkedList<>(); + + private UpstreamLogger(StreamConfig config) { + this.config = config; + } + + public void logRecord(StreamRecord record) throws Exception { + if (!slicedLog.isEmpty()) { + slicedLog.getLast().add(record); + } + } + + public void createSlice(String sliceID) throws Exception { + ListState> nextSlice = + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID, + config.>getTypeSerializerOut(getUserCodeClassloader(; + slicedLog.addLast(nextSlice); + } + + public void discardSlice() { + ListState> logToEvict = slicedLog.pollFirst(); + logToEvict.clear(); + } + + public Iterable> getReplayLog() throws Exception { + final List logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames()); + Collections.sort(logSlices, new Comparator() { + @Override + public int compare(String o1, String o2) { + return Long.valueOf(o1).compareTo(Long.valueOf(o2)); + } + }); + + final List>> wrappedIterators = new ArrayList<>(); + for (String splitID : logSlices) { + wrappedIterators.add(getOperatorStateBackend() + .getOperatorState(new ListStateDescriptor<>(splitID, + config.>getTypeSerializerOut(getUserCodeClassloader(.get().iterator()); + } + + if (wrappedIterators.size() == 0) { + return new Iterable>() { + @Override + public Iterator> iterator() { + return Collections.emptyListIterator(); +
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940874#comment-15940874 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107968436 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -119,13 +183,154 @@ protected void cleanup() throws Exception { * Creates the identification string with which head and tail task find the shared blocking * queue for the back channel. The identification string is unique per parallel head/tail pair * per iteration per job. -* -* @param jid The job ID. -* @param iterationID The id of the iteration in the job. +* +* @param jid The job ID. +* @param iterationID The id of the iteration in the job. * @param subtaskIndex The parallel subtask number * @return The identification string. */ public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { return jid + "-" + iterationID + "-" + subtaskIndex; } + + /** +* An internal operator that solely serves as a state logging facility for persisting, +* partitioning and restoring output logs for dataflow cycles consistently. To support concurrency, +* logs are being sliced proportionally to the number of concurrent snapshots. This allows committed +* output logs to be uniquely identified and cleared after each complete checkpoint. +* +* The design is based on the following assumptions: +* +* - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. +* - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. +* - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that +* gives a singular view of the log. +* +* TODO it seems that ListState.clear does not unregister state. We need to put a hook for that. +* +* @param +*/ + public static class UpstreamLogger extends AbstractStreamOperator implements OneInputStreamOperator { + + private final StreamConfig config; + + private LinkedList>> slicedLog = new LinkedList<>(); + + private UpstreamLogger(StreamConfig config) { + this.config = config; + } + + public void logRecord(StreamRecord record) throws Exception { + if (!slicedLog.isEmpty()) { + slicedLog.getLast().add(record); + } + } + + public void createSlice(String sliceID) throws Exception { + ListState> nextSlice = + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID, + config.>getTypeSerializerOut(getUserCodeClassloader(; + slicedLog.addLast(nextSlice); + } + + public void discardSlice() { + ListState> logToEvict = slicedLog.pollFirst(); + logToEvict.clear(); + } + + public Iterable> getReplayLog() throws Exception { + final List logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames()); + Collections.sort(logSlices, new Comparator() { + @Override + public int compare(String o1, String o2) { + return Long.valueOf(o1).compareTo(Long.valueOf(o2)); + } + }); + + final List>> wrappedIterators = new ArrayList<>(); + for (String splitID : logSlices) { + wrappedIterators.add(getOperatorStateBackend() + .getOperatorState(new ListStateDescriptor<>(splitID, + config.>getTypeSerializerOut(getUserCodeClassloader(.get().iterator()); + } + + if (wrappedIterators.size() == 0) { + return new Iterable>() { + @Override + public Iterator> iterator() { + return Collections.emptyListIterator(); +
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940875#comment-15940875 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107968567 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -119,13 +183,154 @@ protected void cleanup() throws Exception { * Creates the identification string with which head and tail task find the shared blocking * queue for the back channel. The identification string is unique per parallel head/tail pair * per iteration per job. -* -* @param jid The job ID. -* @param iterationID The id of the iteration in the job. +* +* @param jid The job ID. +* @param iterationID The id of the iteration in the job. * @param subtaskIndex The parallel subtask number * @return The identification string. */ public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { return jid + "-" + iterationID + "-" + subtaskIndex; } + + /** +* An internal operator that solely serves as a state logging facility for persisting, +* partitioning and restoring output logs for dataflow cycles consistently. To support concurrency, +* logs are being sliced proportionally to the number of concurrent snapshots. This allows committed +* output logs to be uniquely identified and cleared after each complete checkpoint. +* +* The design is based on the following assumptions: +* +* - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. +* - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. +* - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that +* gives a singular view of the log. +* +* TODO it seems that ListState.clear does not unregister state. We need to put a hook for that. +* +* @param +*/ + public static class UpstreamLogger extends AbstractStreamOperator implements OneInputStreamOperator { + + private final StreamConfig config; + + private LinkedList>> slicedLog = new LinkedList<>(); + + private UpstreamLogger(StreamConfig config) { + this.config = config; + } + + public void logRecord(StreamRecord record) throws Exception { + if (!slicedLog.isEmpty()) { + slicedLog.getLast().add(record); + } + } + + public void createSlice(String sliceID) throws Exception { + ListState> nextSlice = + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID, + config.>getTypeSerializerOut(getUserCodeClassloader(; + slicedLog.addLast(nextSlice); + } + + public void discardSlice() { + ListState> logToEvict = slicedLog.pollFirst(); + logToEvict.clear(); + } + + public Iterable> getReplayLog() throws Exception { + final List logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames()); + Collections.sort(logSlices, new Comparator() { + @Override + public int compare(String o1, String o2) { + return Long.valueOf(o1).compareTo(Long.valueOf(o2)); + } + }); + + final List>> wrappedIterators = new ArrayList<>(); + for (String splitID : logSlices) { + wrappedIterators.add(getOperatorStateBackend() + .getOperatorState(new ListStateDescriptor<>(splitID, + config.>getTypeSerializerOut(getUserCodeClassloader(.get().iterator()); + } + + if (wrappedIterators.size() == 0) { + return new Iterable>() { + @Override + public Iterator> iterator() { + return Collections.emptyListIterator(); +
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940876#comment-15940876 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107968935 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -17,100 +17,164 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.LinkedList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * TODO write javadoc + * + * - open a list state per snapshot process + * - book-keep snapshot logs + * - Clean up state when a savepoint is complete - ONLY in-transit records who do NOT belong in other snapshots + * + * @param + */ @Internal -public class StreamIterationHead extends OneInputStreamTask { +public class StreamIterationHead extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); private volatile boolean running = true; - // - + private volatile RecordWriterOutput[] outputs; + + private UpstreamLogger upstreamLogger; + + private Object lock; + + @Override + public void init() throws Exception { + this.lock = getCheckpointLock(); + getConfiguration().setStreamOperator(new UpstreamLogger(getConfiguration())); + operatorChain = new OperatorChain<>(this); + this.upstreamLogger = (UpstreamLogger) operatorChain.getHeadOperator(); --- End diff -- if this is the same UpstreamLogger instance that you pass 2 lines above then why not use that? :) > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all rec
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940777#comment-15940777 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Hey. Any update/opinion/something anyone? Just a gentle reminder, sorry if this sounds a bit desperate :) > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15926037#comment-15926037 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 I have just rebased to the current master. Please let me wrap this up. It has been over a year :) Unregistering state in the OperatorStateStore is very tiny fix. @StephanEwen @StefanRRichter Is it ok with you to make this small addition in this PR or should I create a separate issue? Hope you have a spare few minutes to take a quick look this time. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905044#comment-15905044 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Hey @addisonj. Sure! You could perhaps review the changes and maybe see how to discard empty operator states if you are motivated. This is the only pending issue for this PR. thanks! > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904089#comment-15904089 ] ASF GitHub Bot commented on FLINK-3257: --- Github user addisonj commented on the issue: https://github.com/apache/flink/pull/1668 Very interested in this work. It sounds like there are few loose ends and then some cleanup before it might be ready for merge, @senorcarbone or @StephanEwen anything that can be supported by someone else? Would love to help wherever possible > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829678#comment-15829678 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 The last update implements a variant of what @StephanEwen proposes. We have put some more thought on this offline too thanks to @gyfora ! The idea is that instead of putting records to each `ListState`, the output log is partitioned into multiple log "slices", one per concurrent checkpoint. More specifically, the `UpstreamLogger` operator at the `IterationHead` slices logs proportionally to the number of concurrent snapshots. This also allows committed output logs to be uniquely identified and cleared after each complete checkpoint. The design is based on the following assumptions: - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that gives a singular view of the log. Before I polish this we need to close a memory leak. The `clear` operation of `State` cleans the state under the registered id but it does not seem to unregister the key itself. Does anyone have an idea on how to unregister state properly? Hope this gets some attention to wrap it up, it's been too long :). > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773700#comment-15773700 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 @StephanEwen could you check my question above? > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15754312#comment-15754312 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Working on it atm . I decided to make the following optimisations but want to very quickly make sure that async checkpointing works the way I believe it does: - Most importantly, I am changing the iteration head to always forward records. Their effects are not present in any in-progress snapshot anyway so that I should had done from the very beginning. :) - If `ListState` is checkpointed asynchronously, depending on the backend I suppose, then the current version of it, during the snapshot, will be persisted as a copy, which means that we can apply mutations right away and therefore reset it right after invoking the snapshot to the beginning of the next in-progress snapshot (some indexing involved). That way we do not need to open new ListStates in the first place. Does this make sense? @StephanEwen Please correct me if I am wrong, regarding the second point. I am just not very familiar with async snapshotting for `ListState` (this is not clear in the documentation for me). Mind also that I do not use the `CheckpointedAsychronously` interface, it seems to be heading towards deprecation. Thanks! > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15745100#comment-15745100 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 agreed @StephanEwen! I will do that. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15744752#comment-15744752 ] ASF GitHub Bot commented on FLINK-3257: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/1668 @senorcarbone I agree, let's fix the multiple checkpoints issue and do the rest in FLIP-15 The other operators have a pretty simply way of doing this: - for synchronous checkpointed operators, no need to do anything, the synchronous part of one checkpoint is over when the next starts (because it is synchronous ;-)) - for asynchronously checkpointed state, the state backend needs to be able to hold multiple snapshots, which are saved by multiple background threads - none of the operators deal with in-flight data, which makes their job easy Dealing with in-flight data probably means that you need to open a ListState for each checkpoint that arrives and add the feed back values to each state, until that particular checkpoints barrier comes back through the feedback channel. I think that should be sufficient. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15744648#comment-15744648 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 These are some good points @StephanEwen, thanks for checking it. How about the following, regarding each issue: - `Concurrent Checkpoints`: Looks like an improvement but I can sure do it in this PR if it is a crucial one. Can you elaborate a bit more or point me out to other concurrent checkpointing operator state examples to get an idea of how you want to do it? - `Reconfiguration` : Sounds interesting...but I am not really aware of it from the devlist. If it is simple enough I could add support for it here. Otherwise I would suggest we address this in a seperate JIRA and PR as an improvement. Is there a design document on how we plan to achieve reconfiguration and repartitioning for operator state specifically somewhere? - `At-most-once blocking queue` : It is obvious from my previous comments that I do not approve this part, but that is something we already got rid of in [FLIP-15](https://cwiki.apache.org/confluence/display/FLINK/FLIP-15+Scoped+Loops+and+Job+Termination) already ([this](https://github.com/FouadMA/flink/commit/9adaac435bcaf3552afe564c739d4e8fd79c433b) commit). How about we address this together with the deadlocks in FLIP-15? - `Deadlocks`: I like the elastic spilling channel idea to resolve deadlocks. I need time to dig a bit more into this and make sure we solve deadlocks and not just improve. Is it ok with you if we address that in [FLIP-15](https://cwiki.apache.org/confluence/display/FLINK/FLIP-15+Scoped+Loops+and+Job+Termination)? I need more time for this part, plus, we need to combine the absense of expiring queues with a proper termination algorithm (otherwise we just solve the deadlocks and the jobs never terminate). What do you think? > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15743344#comment-15743344 ] ASF GitHub Bot commented on FLINK-3257: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/1668 To suggest some way to fix the guarantees: To my mind, the crux lies in the way that the feedback channel is implemented - a simple blocking queue just does not cut it for that case. To make this proper, I think we need to do the following: - Have an elastic feedback channel (unbounded) with a certain memory budget, that can spill if needed. I think it would be best implemented holding data serialized. - On checkpoint, one simply adds the feedback channel data (already bytes) to the checkpoint - The source task should probably prioritize reading from the feedback channel, to keep it always as small as possible. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15743343#comment-15743343 ] ASF GitHub Bot commented on FLINK-3257: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/1668 Thanks for the reminder, I went over the code today. The code looks mostly good, but here are some thoughts: - The head task supports only one concurrent checkpoint. In general, the tasks need to support multiple checkpoints being in progress at the same time. It frequently happens when people trigger savepoints concurrent to a running checkpoint. I think that is important to support. - There tail task offers the elements to the blocking queue. That means records are simply dropped if the capacity bound queue (one element) is not polled by the head task in time. - With the capacity bound in the feedback queue, it is pretty easy to build a full deadlock. Just use a loop function that explodes data into the feedback channel. - Recent code also introduced the ability to change parallelism. What are the semantics here when the parallelism of the loop is changed? Since loops did not support any fault tolerance guarantees, I guess this does improve recovery behavior. But as long as the loops can either deadlock or drop data, the hard guarantees are in the end still a bit weak. So that leaves me a bit ambivalent what to do with this pull request. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15742282#comment-15742282 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Exactly, these two issues do not depend on each other. No doubt loop FT is the first thing that can enable iterations in a production deployment so I would merge that first. Thank you again Gyula for looking into it :) > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15742030#comment-15742030 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on the issue: https://github.com/apache/flink/pull/1668 I think the PR looks pretty good, and it sounds fair to to address termination in a later PR as this will still greatly improve the current guarantees without making the backpressure/termination problems any worse. +1 from me > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15741633#comment-15741633 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Hey again, @StephanEwen @uce . When you have 10 min can you take a look to see if this is acceptable? I would not like to leave this here for months again, it has been out way too long already. The changes are just a few and straightforward so I really encourage you to skim them at your earliest convenience. Thanks! > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15688077#comment-15688077 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Ok, so I am progressing this a bit independently from the termination stuff and then we rebase to the first PR that is merged. I just changed everything and rebased to the current master. Some notable changes: - The `StreamIterationCheckpointingITCase` is not made deterministic, it fails after the first successful checkpoint once and the jobs stops after everything has been recovered appropriately. - I am now using ListState which is supposed to work like a charm with the rocksdb file backend. Note that with the default in-memory backend there is a high chance to get issues given the low memory capacity that it is given by default. - One tricky part that can be potentially done better is the way I set the logger in the StreamIterationHead (had to change the head op field access to `protected` in the OperatorChain) Whenever you find time go ahead and check it out. It passes my super-strict test which is a good thing. :) > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529691#comment-15529691 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Hey! Good to be back :) . Let's fix this properly, as @StephanEwen recommended it now that there is some time. We are writing together with @FouadMA a FLIP to address major loop fixes. Namely, termination determination and fault tolerance. The termination implementation is already in a good shape in my opinion and you can find it [here](https://github.com/senorcarbone/flink/pull/2#pullrequestreview-1929918) so you want to take an early look. The description in the FLIP will make clear of how this works in detail. The FT update for loops will be rebase on top of the loop termination fix. We hope that you will find this good too and btw thanks for your patience :) > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15258904#comment-15258904 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-214883298 ok good to know @uce! Let me get back to it in a couple of weeks and make it right, now it is a bit impossible to find time. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15215789#comment-15215789 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-202818056 Thanks @StephanEwen and @uce for looking into it! I really appreciate it. How about the following: 1. I update this PR with the patch that uses ListState and apply some nice refactorings Gyula made 2. I will also address all your comments and then merge this to master 3. We start working on perfecting stream finalization on loops and backpressure deadlock elimination in seperate PRs right away. These are different problems and we need to address them separately, in my view of course. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210494#comment-15210494 ] ASF GitHub Bot commented on FLINK-3257: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-200909639 The core idea of this is very good, also the illustration is very nice. After an offline chat with @senorcarbone, we concluded that a remaining problem in this is currently the way it integrates with the timeout-based termination detection. Which brings us to the point that we should (in my opinion) change the way that loops terminate. It should probably be based on end-of-stream events, to make it deterministic and not susceptible to delays. Question is now, does it make sense to do the termination change first, and base this on top of it, or to merge this irrespective of that... > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210328#comment-15210328 ] ASF GitHub Bot commented on FLINK-3257: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r57327419 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -17,43 +17,80 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; +import org.apache.flink.types.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + @Internal -public class StreamIterationHead extends OneInputStreamTask { +public class StreamIterationHead extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); private volatile boolean running = true; - // - + /** +* A flag that is on during the duration of a checkpoint. While onSnapshot is true the iteration head has to perform +* upstream backup of all records in transit within the loop. +*/ + private volatile boolean onSnapshot = false; --- End diff -- Always accessed in lock scope, no need for `volatile` > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210279#comment-15210279 ] ASF GitHub Bot commented on FLINK-3257: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r57321347 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -450,112 +450,121 @@ else if (operator != null) { } } - @Override - public boolean triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception { - LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); - - synchronized (lock) { - if (isRunning) { - - // since both state checkpointing and downstream barrier emission occurs in this - // lock scope, they are an atomic operation regardless of the order in which they occur - // we immediately emit the checkpoint barriers, so the downstream operators can start - // their checkpoint work as soon as possible - operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp); - - // now draw the state snapshot - final StreamOperator[] allOperators = operatorChain.getAllOperators(); - final StreamTaskState[] states = new StreamTaskState[allOperators.length]; + /** +* Checkpoints all operator states of the current StreamTask. +* Thread-safety must be handled outside the scope of this function +*/ + protected boolean checkpointStatesInternal(final long checkpointId, long timestamp) throws Exception { --- End diff -- When rebasing we have to double check that nothing changed in this method when calling from `triggerCheckpoint` etc. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210270#comment-15210270 ] ASF GitHub Bot commented on FLINK-3257: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r57320231 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -450,112 +450,121 @@ else if (operator != null) { } } - @Override - public boolean triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception { - LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); - - synchronized (lock) { - if (isRunning) { - - // since both state checkpointing and downstream barrier emission occurs in this - // lock scope, they are an atomic operation regardless of the order in which they occur - // we immediately emit the checkpoint barriers, so the downstream operators can start - // their checkpoint work as soon as possible - operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp); - - // now draw the state snapshot - final StreamOperator[] allOperators = operatorChain.getAllOperators(); - final StreamTaskState[] states = new StreamTaskState[allOperators.length]; + /** +* Checkpoints all operator states of the current StreamTask. +* Thread-safety must be handled outside the scope of this function +*/ + protected boolean checkpointStatesInternal(final long checkpointId, long timestamp) throws Exception { --- End diff -- Regarding the JavaDocs: - The idomiatic style is to have a short description and then a blank line for more details (the first line will be displayed as a summary in the IDE etc.) - The `of the current StreamTask` is clear from context - The Thread-safety part should be more explicit, for instance `The caller has to make sure to call this method in scope of the task's checkpoint lock`. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210263#comment-15210263 ] ASF GitHub Bot commented on FLINK-3257: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r57319737 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -450,112 +450,121 @@ else if (operator != null) { } } - @Override - public boolean triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception { - LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); - - synchronized (lock) { - if (isRunning) { - - // since both state checkpointing and downstream barrier emission occurs in this - // lock scope, they are an atomic operation regardless of the order in which they occur - // we immediately emit the checkpoint barriers, so the downstream operators can start - // their checkpoint work as soon as possible - operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp); - - // now draw the state snapshot - final StreamOperator[] allOperators = operatorChain.getAllOperators(); - final StreamTaskState[] states = new StreamTaskState[allOperators.length]; + /** +* Checkpoints all operator states of the current StreamTask. +* Thread-safety must be handled outside the scope of this function +*/ + protected boolean checkpointStatesInternal(final long checkpointId, long timestamp) throws Exception { --- End diff -- What about naming this as in the comments `drawStateSnapshot`? That it is internal is more or less communicated by the fact that it is a `protected` method. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15172094#comment-15172094 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-190279441 You can find an alternative version using `ListState` in the following branch: https://github.com/senorcarbone/flink/commits/ftloopsalt So I noticed that this version is quite **slower** than the one with custom operator state but it can support larger states apparently. I am (ab)using the PartitionedState to store the ListState in the same key, as @gyfora suggested since it is the only way to obtain the nice representations at the moment. It would be nice to have them available for operator state snapshots as well - @aljoscha have you thought about it? When there is free time (after the release) it would be nice to see what @aljoscha and @StephanEwen think of the two takes as well. No hurries, just take a look when you have time! The two annoying issues I noticed during testing and we need to check soon are the following: - The overhead of transmitting and finally delivering a barrier from the `head` to its consumers increases in time (for each subsequent checkpoint). That is due to having a single queue at the beginning of the iterative part of the job. Events coming from the backedge are pushed further behind the input queue. It would be nice to have take events in round robin among the two input gates (iteration source, regular input). Otherwise, checkpoints in iterative jobs can be really prolonged in time due to this. - We need a proper way to deal with deadlocks. I removed the part where we discard events in the tail upon timeout since that boils down to at most once semantics. This PR is not solving deadlocks but I think we should find a graceful way to tackle them. (@uce, any ideas? ) > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15159365#comment-15159365 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-187827823 I can't really add anything to the timeout question :D As for the snapshotting. I would go with the ListState as that potentially provides very efficient implementation for this logging phase. (It is actually designed to handle this) Then we don't have to worry about spilling there either. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15159353#comment-15159353 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-187825999 Thanks @tillrohrmann for the feedback! I merged `ForwardingOneInputTask` into `StreamIterationSink`. One more think I missed pointing out is that when the iteration timeout occurs the `StreamIterationHead` flushes its log to its output before terminating. An alternative take would be to delay termination until the barrier arrives back to complete the snapshot. The problem with that version, even though it's correct, is that under frequent checkpoint intervals the iteration head could potentially never terminate because it will always be in snapshotting state when the timeout occurs. Regarding the state snapshotting, should I switch to using the ListState representation instead or does it make no difference for the time being? > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15154090#comment-15154090 ] ASF GitHub Bot commented on FLINK-3257: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-186174994 Great idea @senorcarbone. I also really like it :-) I agree with @gyfora to include the logic of the `ForwardingOneInputStreamTask` in the `StreamIterationTail`. For the spillable state we can only use the `RocksDBStateBackend` at the moment. But I think that is fine for a first iteration. Eventually, we'll also add managed memory support for the memory based state backends which will make them equally suitable for the task. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15154030#comment-15154030 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-186147597 Thanks for going through it Gyula! I agree, the `ForwardingOneInputStreamTask` can be integrated in the `StreamIterationTail`, I will probably do that. I just wanted to get the code look less messy in the tail part but you are right. About the operators, the `RecordPusher` was already there (even though it was not initialised correctly). I just added the `UpstreamLogger` because I wanted it to follow the operator lifecycle. I think the way it is we do not need to override the `checkpointStatesInternal` to do the changes we discuss. We just need to change the operator callback method and this could also be more robust to changes in the StreamTask and operator interfaces and default behavior, just my personal view but I see your point too. I agree with the spill buffer logic. I am only confused a bit with the output stream thing (the other part of the problem), is there something already I can use? I haven't check recent changes. How does this work if we use in-memory backend for example? The blob with all the messages will be anyway packaged and sent within the stateHandle to the job manager in a single message (potentially being over the limits), even if we use a stream API, or? > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15154001#comment-15154001 ] ASF GitHub Bot commented on FLINK-3257: --- Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-186142276 Thanks Paris, I like the idea, it's a correct modification of the original algorithm to make it much easier to implement on Flink for the price of buffering more records. I have some comments to the implementation: - Why did you introduce the ForwardingOneInputStreamTask if only the Iteration tail will implement this behaviour? I am not sure if other StreamTasks will do this in the future, so it might be better to just put the logic in the StreamIterationTail instead of adding a new abstraction - I think the RecordPusher and UpstreamLogger embedded operators on the head and tail tasks are not really necessary and just add additional complexity to debugging and understanding the code. The Upstream logger only makes checkpointing "easier" for the implementer but we probably want to do some custom logic there anyways. So I would probably just overwrite the checkpointStatesInternal method directly. I agree that we definitely need to spill the buffered records to disk or use managed memory for this. This can be similar to the BarrierBuffer logic. We can combine this with checkpointing to an output stream to never have to materialize the full state. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15152692#comment-15152692 ] ASF GitHub Bot commented on FLINK-3257: --- GitHub user senorcarbone opened a pull request: https://github.com/apache/flink/pull/1668 [FLINK-3257] Add Exactly-Once Processing Guarantees for Iterative DataStream Jobs # **[WIP]** This is a first version of the adapted snapshot algorithm to support iterations. It is correct and works in practice...well, when memory capacity is enough for its logging requirements but I am working on that, hopefully with a little help from you. Before we go into the implementation details let me describe briefly the new algorithm. ## Algorithm Our existing checkpoint algorithm has a very fluid and straightforward protocol. It just makes sure that all checkpoint barriers are aligned in each operator so that all records before barriers (pre-shot) are processed before taking a snapshot. Since waiting indefinitely for all records in-transit within a cycle of an execution graph can violate termination (crucial liveness property) we have to...save any unprocessed records for later during the snapshot. In this take of the algorithm on Flink we assign that role to the `Iteration Head`. The steps this version varies from the vanilla algorithm are simply the following: 1. An `Iteration Head` receives a barrier from the system runtime (as before) and: - Goes into **Logging Mode**. That means that from that point on every record it receives from its `Iteration Sink` is buffered in its own operator state and **not** forwarded further until it goes back to normal mode. - **Forwards** the barrier to its downstream nodes (this guarantees liveness, otherwise we have a deadlock). 2. Eventually, the `Iteration Head` receives ## Example blabla ![ftloops-topology](https://cloud.githubusercontent.com/assets/858078/13151679/7f150538-d66b-11e5-98c8-7bbe2243b810.png) blabla ![diagram](https://cloud.githubusercontent.com/assets/858078/13151664/7361a638-d66b-11e5-94e9-64f70a8130d7.png) ## Current Implementation Details ## Open/Pending Issues You can merge this pull request into a Git repository by running: $ git pull https://github.com/senorcarbone/flink ftloops Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1668.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1668 commit 38256e4c4bb00183794699027e8e4298787c66fa Author: Paris Carbone Date: 2016-01-19T10:18:54Z exactly-once processing test for stream iterations commit dbf2625536289dc70724ef798fd02989e586d874 Author: Paris Carbone Date: 2016-02-18T15:49:19Z [wip] adapt snapshot mechanism for iterative jobs > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15106006#comment-15106006 ] Márton Balassi commented on FLINK-3257: --- No problem. It is great that you are implementing the feature. It seems a bit nasty as it goes through many layers of the code. Good luck with it! > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15105997#comment-15105997 ] Paris Carbone commented on FLINK-3257: -- Hey Marton. Sorry but no. I am very actively working on it and it is the very core part of my work so it would be a waste to give the task to someone else. I'd suggest you give the master student something else if it's ok. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15105829#comment-15105829 ] Márton Balassi commented on FLINK-3257: --- Hey Paris, how urgent is this feature for you? I have a master student who would love to give this implementation a try if it is not an immediate task on your side. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)