[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940876#comment-15940876 ]
ASF GitHub Bot commented on FLINK-3257: --------------------------------------- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107968935 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -17,100 +17,164 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.LinkedList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * TODO write javadoc + * <p> + * - open a list state per snapshot process + * - book-keep snapshot logs + * - Clean up state when a savepoint is complete - ONLY in-transit records who do NOT belong in other snapshots + * + * @param <IN> + */ @Internal -public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> { +public class StreamIterationHead<IN> extends OneInputStreamTask<IN, IN> { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); private volatile boolean running = true; - // ------------------------------------------------------------------------ - + private volatile RecordWriterOutput<IN>[] outputs; + + private UpstreamLogger<IN> upstreamLogger; + + private Object lock; + + @Override + public void init() throws Exception { + this.lock = getCheckpointLock(); + getConfiguration().setStreamOperator(new UpstreamLogger(getConfiguration())); + operatorChain = new OperatorChain<>(this); + this.upstreamLogger = (UpstreamLogger<IN>) operatorChain.getHeadOperator(); --- End diff -- if this is the same UpstreamLogger instance that you pass 2 lines above then why not use that? :) > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > ------------------------------------------------------------------- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API > Reporter: Paris Carbone > Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)