[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2021-04-16 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17323732#comment-17323732
 ] 

Flink Jira Bot commented on FLINK-3257:
---

This issue is assigned but has not received an update in 7 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2018-12-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706487#comment-16706487
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

tzanko-matev commented on issue #1668: [FLINK-3257] Add Exactly-Once Processing 
Guarantees for Iterative DataStream Jobs
URL: https://github.com/apache/flink/pull/1668#issuecomment-443545153
 
 
   Hi, are there any updates on this feature?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>Priority: Major
>  Labels: pull-request-available
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278282#comment-16278282
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
well..major runtime changes are coming with FLIP-6, 15 and 16 so I would 
suggest you watch those. Loop FT will be included in one of these along with 
other loop redesign features.


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266364#comment-16266364
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user dikei commented on the issue:

https://github.com/apache/flink/pull/1668
  
Hi. Do we have any updates on this :)


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943328#comment-15943328
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
sweet! thanks @StefanRRichter 



> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943325#comment-15943325
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/1668
  
For raw operator state, override 
`AbstractStreamOperator::snapshotState(StateSnapshotContext context)` inside 
your operator.  Your implementation calls to super, then it can obtain the raw 
stream via `context.getRawOperatorStateOutput()`. This stream works like a 
normal output stream, except that you can also call 
`stream.startNewPartition()`. This signals that a partition is started and 
previous partitions are finalized/immutable. Partitions are the atomic units of 
state redistribution, think of them as the indiviual elements in a 
`ListCheckpointed` state.

For restoring, override 
`AbstractStreamOperator::initializeState(StateInitializationContext context)`. 
After calling super, `context.getRawOperatorStateInputs()` provides an iterable 
with one input stream per partition that your operator should restore.


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943245#comment-15943245
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r108166093
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -17,100 +17,164 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TODO write javadoc
+ * 
+ * - open a list state per snapshot process
+ * - book-keep snapshot logs
+ * - Clean up state when a savepoint is complete - ONLY in-transit records 
who do NOT belong in other snapshots
+ *
+ * @param 
+ */
 @Internal
-public class StreamIterationHead extends OneInputStreamTask 
{
+public class StreamIterationHead extends OneInputStreamTask {
 
private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
 
private volatile boolean running = true;
 
-   // 

-   
+   private volatile RecordWriterOutput[] outputs;
+
+   private UpstreamLogger upstreamLogger;
+
+   private Object lock;
+
+   @Override
+   public void init() throws Exception {
+   this.lock = getCheckpointLock();
+   getConfiguration().setStreamOperator(new 
UpstreamLogger(getConfiguration()));
+   operatorChain = new OperatorChain<>(this);
+   this.upstreamLogger = (UpstreamLogger) 
operatorChain.getHeadOperator();
--- End diff --

it's another instance that is why I was fetching it back there. 
The OperatorChain basically deserialises and sets up another instance 
through the configuration.


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the 

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943135#comment-15943135
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r108152267
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 ---
@@ -64,6 +70,22 @@ public void init() throws Exception {
super.init();
}
 
+   @Override
+   protected boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
+   LOG.debug("Starting checkpoint {} on task {}", 
checkpointMetaData.getCheckpointId(), getName());
+
+   synchronized (getCheckpointLock()) {
+   if (isRunning()) {
+   dataChannel.put(new Either.Right(new 
CheckpointBarrier(checkpointMetaData.getCheckpointId(), 
checkpointMetaData.getTimestamp(), checkpointOptions)));
+   
getEnvironment().acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), 
checkpointMetrics);
--- End diff --

Hm, not really. I cannot think of a possible usage of loaded operators in 
IterationSink tasks in the future too. 


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943105#comment-15943105
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Thanks for the review @gyfora and @StephanEwen , these are very good points.

@StephanEwen makes sense to not really index/keep metadata of individual 
records in log slices, it is extra overhead. Writing raw operator state makes 
sense, so I will do that once @StefanRRichter  gives me some pointers, that 
would be great. 

Any redistribution of the checkpoint slices would violate causality so I 
hope the "list redistribution pattern" actually keeps the set of registered 
operator states per instance intact. The garbage collection issue still remains 
but maybe (if @StefanRRichter approves) I can add an `unregister` functionality 
to the `OperatorStateStore`.

I can also add preconfigured operators (not that they will be reused 
anywhere). It is more clean but I really need to see how can I get full control 
of the `task` checkpointing behaviour from the `operator` level (since the 
default task checkpointing behaviour is altered at the task-level).




> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941700#comment-15941700
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r108034229
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 ---
@@ -64,6 +70,22 @@ public void init() throws Exception {
super.init();
}
 
+   @Override
+   protected boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
+   LOG.debug("Starting checkpoint {} on task {}", 
checkpointMetaData.getCheckpointId(), getName());
+
+   synchronized (getCheckpointLock()) {
+   if (isRunning()) {
+   dataChannel.put(new Either.Right(new 
CheckpointBarrier(checkpointMetaData.getCheckpointId(), 
checkpointMetaData.getTimestamp(), checkpointOptions)));
+   
getEnvironment().acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), 
checkpointMetrics);
--- End diff --

Can the `IterationTailTask` contain operators as well, or is it always a 
task without operators? If it has operators, we cannot immediately acknowledge 
here, but need to delegate to superclass checkpoint method instead.


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940873#comment-15940873
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107967886
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
 * Creates the identification string with which head and tail task find 
the shared blocking
 * queue for the back channel. The identification string is unique per 
parallel head/tail pair
 * per iteration per job.
-* 
-* @param jid The job ID.
-* @param iterationID The id of the iteration in the job.
+*
+* @param jid  The job ID.
+* @param iterationID  The id of the iteration in the job.
 * @param subtaskIndex The parallel subtask number
 * @return The identification string.
 */
public static String createBrokerIdString(JobID jid, String 
iterationID, int subtaskIndex) {
return jid + "-" + iterationID + "-" + subtaskIndex;
}
+
+   /**
+* An internal operator that solely serves as a state logging facility 
for persisting,
+* partitioning and restoring output logs for dataflow cycles 
consistently. To support concurrency,
+* logs are being sliced proportionally to the number of concurrent 
snapshots. This allows committed
+* output logs to be uniquely identified and cleared after each 
complete checkpoint.
+* 
+* The design is based on the following assumptions:
+* 
+* - A slice is named after a checkpoint ID. Checkpoint IDs are 
numerically ordered within an execution.
+* - Each checkpoint barrier arrives back in FIFO order, thus we 
discard log slices in respective FIFO order.
+* - Upon restoration the logger sorts sliced logs in the same FIFO 
order and returns an Iterable that
+* gives a singular view of the log.
+* 
+* TODO it seems that ListState.clear does not unregister state. We 
need to put a hook for that.
+*
+* @param 
+*/
+   public static class UpstreamLogger extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private final StreamConfig config;
+
+   private LinkedList> slicedLog = new 
LinkedList<>();
+
+   private UpstreamLogger(StreamConfig config) {
+   this.config = config;
+   }
+
+   public void logRecord(StreamRecord record) throws Exception 
{
+   if (!slicedLog.isEmpty()) {
+   slicedLog.getLast().add(record);
+   }
+   }
+
+   public void createSlice(String sliceID) throws Exception {
+   ListState nextSlice =
+   getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(sliceID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(;
+   slicedLog.addLast(nextSlice);
+   }
+
+   public void discardSlice() {
+   ListState logToEvict = 
slicedLog.pollFirst();
+   logToEvict.clear();
+   }
+
+   public Iterable getReplayLog() throws 
Exception {
+   final List logSlices = new 
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
+   Collections.sort(logSlices, new Comparator() {
+   @Override
+   public int compare(String o1, String o2) {
+   return 
Long.valueOf(o1).compareTo(Long.valueOf(o2));
+   }
+   });
+
+   final List> wrappedIterators 
= new ArrayList<>();
+   for (String splitID : logSlices) {
+   wrappedIterators.add(getOperatorStateBackend()
+   .getOperatorState(new 
ListStateDescriptor<>(splitID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(.get().iterator());
+   }
+
+   if (wrappedIterators.size() == 0) {
+   return new Iterable() {
+   @Override
+   public Iterator 
iterator() 

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940878#comment-15940878
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107969496
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -17,100 +17,164 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TODO write javadoc
+ * 
+ * - open a list state per snapshot process
+ * - book-keep snapshot logs
+ * - Clean up state when a savepoint is complete - ONLY in-transit records 
who do NOT belong in other snapshots
+ *
+ * @param 
+ */
 @Internal
-public class StreamIterationHead extends OneInputStreamTask 
{
+public class StreamIterationHead extends OneInputStreamTask {
 
private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
 
private volatile boolean running = true;
 
-   // 

-   
+   private volatile RecordWriterOutput[] outputs;
+
+   private UpstreamLogger upstreamLogger;
+
+   private Object lock;
+
+   @Override
+   public void init() throws Exception {
+   this.lock = getCheckpointLock();
+   getConfiguration().setStreamOperator(new 
UpstreamLogger(getConfiguration()));
+   operatorChain = new OperatorChain<>(this);
+   this.upstreamLogger = (UpstreamLogger) 
operatorChain.getHeadOperator();
+   }
+
@Override
protected void run() throws Exception {
-   
+
final String iterationId = getConfiguration().getIterationId();
if (iterationId == null || iterationId.length() == 0) {
throw new Exception("Missing iteration ID in the task 
configuration");
}
-   
-   final String brokerID = 
createBrokerIdString(getEnvironment().getJobID(), iterationId ,
-   
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
-   
+   final String brokerID = 
createBrokerIdString(getEnvironment().getJobID(), iterationId,
+   getEnvironment().getTaskInfo().getIndexOfThisSubtask());
final long iterationWaitTime = 
getConfiguration().getIterationWaitTime();
final boolean shouldWait = iterationWaitTime > 0;
 
-   final BlockingQueue dataChannel = new 
ArrayBlockingQueue(1);
+   final BlockingQueue> dataChannel
+   = new ArrayBlockingQueue<>(1);
 
// offer the queue for the tail
BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);
LOG.info("Iteration head {} added feedback queue under {}", 
getName(), brokerID);
 
// do the work 
   

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940875#comment-15940875
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107968567
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
 * Creates the identification string with which head and tail task find 
the shared blocking
 * queue for the back channel. The identification string is unique per 
parallel head/tail pair
 * per iteration per job.
-* 
-* @param jid The job ID.
-* @param iterationID The id of the iteration in the job.
+*
+* @param jid  The job ID.
+* @param iterationID  The id of the iteration in the job.
 * @param subtaskIndex The parallel subtask number
 * @return The identification string.
 */
public static String createBrokerIdString(JobID jid, String 
iterationID, int subtaskIndex) {
return jid + "-" + iterationID + "-" + subtaskIndex;
}
+
+   /**
+* An internal operator that solely serves as a state logging facility 
for persisting,
+* partitioning and restoring output logs for dataflow cycles 
consistently. To support concurrency,
+* logs are being sliced proportionally to the number of concurrent 
snapshots. This allows committed
+* output logs to be uniquely identified and cleared after each 
complete checkpoint.
+* 
+* The design is based on the following assumptions:
+* 
+* - A slice is named after a checkpoint ID. Checkpoint IDs are 
numerically ordered within an execution.
+* - Each checkpoint barrier arrives back in FIFO order, thus we 
discard log slices in respective FIFO order.
+* - Upon restoration the logger sorts sliced logs in the same FIFO 
order and returns an Iterable that
+* gives a singular view of the log.
+* 
+* TODO it seems that ListState.clear does not unregister state. We 
need to put a hook for that.
+*
+* @param 
+*/
+   public static class UpstreamLogger extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private final StreamConfig config;
+
+   private LinkedList> slicedLog = new 
LinkedList<>();
+
+   private UpstreamLogger(StreamConfig config) {
+   this.config = config;
+   }
+
+   public void logRecord(StreamRecord record) throws Exception 
{
+   if (!slicedLog.isEmpty()) {
+   slicedLog.getLast().add(record);
+   }
+   }
+
+   public void createSlice(String sliceID) throws Exception {
+   ListState nextSlice =
+   getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(sliceID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(;
+   slicedLog.addLast(nextSlice);
+   }
+
+   public void discardSlice() {
+   ListState logToEvict = 
slicedLog.pollFirst();
+   logToEvict.clear();
+   }
+
+   public Iterable getReplayLog() throws 
Exception {
+   final List logSlices = new 
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
+   Collections.sort(logSlices, new Comparator() {
+   @Override
+   public int compare(String o1, String o2) {
+   return 
Long.valueOf(o1).compareTo(Long.valueOf(o2));
+   }
+   });
+
+   final List> wrappedIterators 
= new ArrayList<>();
+   for (String splitID : logSlices) {
+   wrappedIterators.add(getOperatorStateBackend()
+   .getOperatorState(new 
ListStateDescriptor<>(splitID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(.get().iterator());
+   }
+
+   if (wrappedIterators.size() == 0) {
+   return new Iterable() {
+   @Override
+   public Iterator 
iterator() 

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940877#comment-15940877
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107967910
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
 * Creates the identification string with which head and tail task find 
the shared blocking
 * queue for the back channel. The identification string is unique per 
parallel head/tail pair
 * per iteration per job.
-* 
-* @param jid The job ID.
-* @param iterationID The id of the iteration in the job.
+*
+* @param jid  The job ID.
+* @param iterationID  The id of the iteration in the job.
 * @param subtaskIndex The parallel subtask number
 * @return The identification string.
 */
public static String createBrokerIdString(JobID jid, String 
iterationID, int subtaskIndex) {
return jid + "-" + iterationID + "-" + subtaskIndex;
}
+
+   /**
+* An internal operator that solely serves as a state logging facility 
for persisting,
+* partitioning and restoring output logs for dataflow cycles 
consistently. To support concurrency,
+* logs are being sliced proportionally to the number of concurrent 
snapshots. This allows committed
+* output logs to be uniquely identified and cleared after each 
complete checkpoint.
+* 
+* The design is based on the following assumptions:
+* 
+* - A slice is named after a checkpoint ID. Checkpoint IDs are 
numerically ordered within an execution.
+* - Each checkpoint barrier arrives back in FIFO order, thus we 
discard log slices in respective FIFO order.
+* - Upon restoration the logger sorts sliced logs in the same FIFO 
order and returns an Iterable that
+* gives a singular view of the log.
+* 
+* TODO it seems that ListState.clear does not unregister state. We 
need to put a hook for that.
+*
+* @param 
+*/
+   public static class UpstreamLogger extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private final StreamConfig config;
+
+   private LinkedList> slicedLog = new 
LinkedList<>();
+
+   private UpstreamLogger(StreamConfig config) {
+   this.config = config;
+   }
+
+   public void logRecord(StreamRecord record) throws Exception 
{
+   if (!slicedLog.isEmpty()) {
+   slicedLog.getLast().add(record);
+   }
+   }
+
+   public void createSlice(String sliceID) throws Exception {
+   ListState nextSlice =
+   getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(sliceID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(;
+   slicedLog.addLast(nextSlice);
+   }
+
+   public void discardSlice() {
+   ListState logToEvict = 
slicedLog.pollFirst();
+   logToEvict.clear();
+   }
+
+   public Iterable getReplayLog() throws 
Exception {
+   final List logSlices = new 
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
+   Collections.sort(logSlices, new Comparator() {
+   @Override
+   public int compare(String o1, String o2) {
+   return 
Long.valueOf(o1).compareTo(Long.valueOf(o2));
+   }
+   });
+
+   final List> wrappedIterators 
= new ArrayList<>();
+   for (String splitID : logSlices) {
+   wrappedIterators.add(getOperatorStateBackend()
+   .getOperatorState(new 
ListStateDescriptor<>(splitID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(.get().iterator());
+   }
+
+   if (wrappedIterators.size() == 0) {
+   return new Iterable() {
+   @Override
+   public Iterator 
iterator() 

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940874#comment-15940874
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107968436
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
 * Creates the identification string with which head and tail task find 
the shared blocking
 * queue for the back channel. The identification string is unique per 
parallel head/tail pair
 * per iteration per job.
-* 
-* @param jid The job ID.
-* @param iterationID The id of the iteration in the job.
+*
+* @param jid  The job ID.
+* @param iterationID  The id of the iteration in the job.
 * @param subtaskIndex The parallel subtask number
 * @return The identification string.
 */
public static String createBrokerIdString(JobID jid, String 
iterationID, int subtaskIndex) {
return jid + "-" + iterationID + "-" + subtaskIndex;
}
+
+   /**
+* An internal operator that solely serves as a state logging facility 
for persisting,
+* partitioning and restoring output logs for dataflow cycles 
consistently. To support concurrency,
+* logs are being sliced proportionally to the number of concurrent 
snapshots. This allows committed
+* output logs to be uniquely identified and cleared after each 
complete checkpoint.
+* 
+* The design is based on the following assumptions:
+* 
+* - A slice is named after a checkpoint ID. Checkpoint IDs are 
numerically ordered within an execution.
+* - Each checkpoint barrier arrives back in FIFO order, thus we 
discard log slices in respective FIFO order.
+* - Upon restoration the logger sorts sliced logs in the same FIFO 
order and returns an Iterable that
+* gives a singular view of the log.
+* 
+* TODO it seems that ListState.clear does not unregister state. We 
need to put a hook for that.
+*
+* @param 
+*/
+   public static class UpstreamLogger extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private final StreamConfig config;
+
+   private LinkedList> slicedLog = new 
LinkedList<>();
+
+   private UpstreamLogger(StreamConfig config) {
+   this.config = config;
+   }
+
+   public void logRecord(StreamRecord record) throws Exception 
{
+   if (!slicedLog.isEmpty()) {
+   slicedLog.getLast().add(record);
+   }
+   }
+
+   public void createSlice(String sliceID) throws Exception {
+   ListState nextSlice =
+   getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(sliceID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(;
+   slicedLog.addLast(nextSlice);
+   }
+
+   public void discardSlice() {
+   ListState logToEvict = 
slicedLog.pollFirst();
+   logToEvict.clear();
+   }
+
+   public Iterable getReplayLog() throws 
Exception {
+   final List logSlices = new 
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
+   Collections.sort(logSlices, new Comparator() {
+   @Override
+   public int compare(String o1, String o2) {
+   return 
Long.valueOf(o1).compareTo(Long.valueOf(o2));
+   }
+   });
+
+   final List> wrappedIterators 
= new ArrayList<>();
+   for (String splitID : logSlices) {
+   wrappedIterators.add(getOperatorStateBackend()
+   .getOperatorState(new 
ListStateDescriptor<>(splitID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(.get().iterator());
+   }
+
+   if (wrappedIterators.size() == 0) {
+   return new Iterable() {
+   @Override
+   public Iterator 
iterator() 

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940876#comment-15940876
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107968935
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -17,100 +17,164 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TODO write javadoc
+ * 
+ * - open a list state per snapshot process
+ * - book-keep snapshot logs
+ * - Clean up state when a savepoint is complete - ONLY in-transit records 
who do NOT belong in other snapshots
+ *
+ * @param 
+ */
 @Internal
-public class StreamIterationHead extends OneInputStreamTask 
{
+public class StreamIterationHead extends OneInputStreamTask {
 
private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
 
private volatile boolean running = true;
 
-   // 

-   
+   private volatile RecordWriterOutput[] outputs;
+
+   private UpstreamLogger upstreamLogger;
+
+   private Object lock;
+
+   @Override
+   public void init() throws Exception {
+   this.lock = getCheckpointLock();
+   getConfiguration().setStreamOperator(new 
UpstreamLogger(getConfiguration()));
+   operatorChain = new OperatorChain<>(this);
+   this.upstreamLogger = (UpstreamLogger) 
operatorChain.getHeadOperator();
--- End diff --

if this is the same UpstreamLogger instance that you pass 2 lines above 
then why not use that? :)


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all 

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940777#comment-15940777
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Hey. Any update/opinion/something anyone?
Just a gentle reminder, sorry if this sounds a bit desperate :)



> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926037#comment-15926037
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
I have just rebased to the current master.
Please let me wrap this up. It has been over a year :)

Unregistering state in the OperatorStateStore is very tiny fix.
@StephanEwen @StefanRRichter Is it ok with you to make this small addition 
in this PR or should I create a separate issue? Hope you have a spare few 
minutes to take a quick look this time.


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905044#comment-15905044
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Hey @addisonj. 
Sure! You could perhaps review the changes and maybe see how to discard 
empty operator states if you are motivated. This is the only pending issue for 
this PR. thanks!


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904089#comment-15904089
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user addisonj commented on the issue:

https://github.com/apache/flink/pull/1668
  
Very interested in this work. It sounds like there are few loose ends and 
then some cleanup before it might be ready for merge, @senorcarbone or 
@StephanEwen anything that can be supported by someone else? Would love to help 
wherever possible


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15829678#comment-15829678
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
The last update implements a variant of what @StephanEwen proposes. We have 
put some more thought on this offline too thanks to @gyfora ! The idea is that 
instead of putting records to each `ListState`, the output log is partitioned 
into multiple log "slices", one per concurrent checkpoint.

More specifically, the `UpstreamLogger` operator at the `IterationHead` 
slices logs proportionally to the number of concurrent snapshots. This also 
allows committed output logs to be uniquely identified and cleared after each 
complete checkpoint. The design is based on the following assumptions:

- A slice is named after a checkpoint ID. Checkpoint IDs are numerically 
ordered within an execution.
- Each checkpoint barrier arrives back in FIFO order, thus we discard log 
slices in respective FIFO order.
 - Upon restoration the logger sorts sliced logs in the same FIFO order and 
returns an Iterable that gives a singular view of the log.

Before I polish this we need to close a memory leak. The `clear` operation 
of `State` cleans the state under the registered id but it does not seem to 
unregister the key itself. Does anyone have an idea on how to unregister state 
properly? Hope this gets some attention to wrap it up, it's been too long :). 



> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15773700#comment-15773700
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
@StephanEwen could you check my question above? 


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-12-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15754312#comment-15754312
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Working on it atm . I decided to make the following optimisations but want 
to very quickly make sure that async checkpointing works the way I believe it 
does:
- Most importantly, I am changing the iteration head to always forward 
records. Their effects are not present in any in-progress snapshot anyway so 
that I should had done from the very beginning. :)
- If `ListState` is checkpointed asynchronously, depending on the backend I 
suppose, then the current version of it, during the snapshot, will be persisted 
as a copy, which means that we can apply mutations right away and therefore 
reset it right after invoking the snapshot to the beginning of the next 
in-progress snapshot (some indexing involved). That way we do not need to open 
new ListStates in the first place. Does this make sense?

@StephanEwen Please correct me if I am wrong, regarding the second point. I 
am just not very familiar with async snapshotting for `ListState` (this is not 
clear in the documentation for me). Mind also that I do not use the 
`CheckpointedAsychronously` interface, it seems to be heading towards 
deprecation. Thanks!


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15745100#comment-15745100
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
agreed @StephanEwen! I will do that. 


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15744752#comment-15744752
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/1668
  
@senorcarbone I agree, let's fix the multiple checkpoints issue and do the 
rest in FLIP-15

The other operators have a pretty simply way of doing this:
  - for synchronous checkpointed operators, no need to do anything, the 
synchronous part of one checkpoint is over when the next starts (because it is 
synchronous ;-))
  - for asynchronously checkpointed state, the state backend needs to be 
able to hold multiple snapshots, which are saved by multiple background threads
  - none of the operators deal with in-flight data, which makes their job 
easy

Dealing with in-flight data probably means that you need to open a 
ListState for each checkpoint that arrives and add the feed back values to each 
state, until that particular checkpoints barrier comes back through the 
feedback channel. I think that should be sufficient.



> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15744648#comment-15744648
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
These are some good points @StephanEwen, thanks for checking it. 
How about the following, regarding each issue:

- `Concurrent Checkpoints`: Looks like an improvement but I can sure do it 
in this PR if  it is a crucial one. Can you elaborate a bit more or point me 
out to other concurrent checkpointing operator state examples to get an idea of 
how you want to do it?
- `Reconfiguration` : Sounds interesting...but I am not really aware of it 
from the devlist. If it is simple enough I could add support for it here. 
Otherwise I would suggest we address this in a seperate JIRA and PR as an 
improvement. Is there a design document on how we plan to achieve 
reconfiguration and repartitioning for operator state specifically somewhere?
- `At-most-once blocking queue` : It is obvious from my previous comments 
that I do not approve this part, but that is something we already got rid of in 
[FLIP-15](https://cwiki.apache.org/confluence/display/FLINK/FLIP-15+Scoped+Loops+and+Job+Termination)
 already 
([this](https://github.com/FouadMA/flink/commit/9adaac435bcaf3552afe564c739d4e8fd79c433b)
 commit). How about we address this together with the deadlocks in FLIP-15?
- `Deadlocks`: I like the elastic spilling channel idea to resolve 
deadlocks. I need time to dig a bit more into this and make sure we solve 
deadlocks and not just improve. Is it ok with you if we address that in 
[FLIP-15](https://cwiki.apache.org/confluence/display/FLINK/FLIP-15+Scoped+Loops+and+Job+Termination)?
 I need more time for this part, plus, we need to combine the absense of 
expiring queues with a proper termination algorithm (otherwise we just solve 
the deadlocks and the jobs never terminate).

What do you think?



> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743344#comment-15743344
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/1668
  
To suggest some way to fix the guarantees: To my mind, the crux lies in the 
way that the feedback channel is implemented - a simple blocking queue just 
does not cut it for that case. To make this proper, I think we need to do the 
following:
  - Have an elastic feedback channel (unbounded) with a certain memory 
budget, that can spill if needed. I think it would be best implemented holding 
data serialized.
  - On checkpoint, one simply adds the feedback channel data (already 
bytes) to the checkpoint
  - The source task should probably prioritize reading from the feedback 
channel, to keep it always as small as possible.



> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743343#comment-15743343
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/1668
  
Thanks for the reminder, I went over the code today. The code looks mostly 
good, but here are some thoughts:

  - The head task supports only one concurrent checkpoint. In general, the 
tasks need to support multiple checkpoints being in progress at the same time. 
It frequently happens when people trigger savepoints concurrent to a running 
checkpoint. I think that is important to support.

  - There tail task offers the elements to the blocking queue. That means 
records are simply dropped if the capacity bound queue (one element) is not 
polled by the head task in time.

  - With the capacity bound in the feedback queue, it is pretty easy to 
build a full deadlock. Just use a loop function that explodes data into the 
feedback channel.

  - Recent code also introduced the ability to change parallelism. What are 
the semantics here when the parallelism of the loop is changed?

Since loops did not support any fault tolerance guarantees, I guess this 
does improve recovery behavior. But as long as the loops can either deadlock or 
drop data, the hard guarantees are in the end still a bit weak. So that leaves 
me a bit ambivalent what to do with this pull request.



> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742282#comment-15742282
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Exactly, these two issues do not depend on each other.  No doubt loop FT is 
the first thing that can enable iterations in a production deployment so I 
would merge that first.

Thank you again Gyula for looking into it :)



> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15741633#comment-15741633
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Hey again, @StephanEwen @uce .
When you have 10 min can you take a look to see if this is acceptable?  
I would not like to leave this here for months again, it has been out way 
too long already. 

The changes are just a few and straightforward so I really encourage you to 
skim them at your earliest convenience. Thanks!


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688077#comment-15688077
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Ok, so I am progressing this a bit independently from the termination stuff 
and then we rebase to the first PR that is merged. I just changed everything 
and rebased to the current master. 

Some notable changes:
- The `StreamIterationCheckpointingITCase` is not made deterministic, it 
fails after the first successful checkpoint once and the jobs stops after 
everything has been recovered appropriately.
- I am now using ListState which is supposed to work like a charm with the 
rocksdb file backend. Note that with the default in-memory backend there is a 
high chance to get issues given the low memory capacity that it is given by 
default.
- One tricky part that can be potentially done better is the way I set the 
logger in the StreamIterationHead (had to change the head op field access to 
`protected` in the OperatorChain)

Whenever you find time go ahead and check it out. It passes my super-strict 
test which is a good thing. :)


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529691#comment-15529691
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Hey! Good to be back :) . Let's fix this properly, as @StephanEwen 
recommended it now that there is some time.
We are writing together with @FouadMA a FLIP to address major loop fixes. 
Namely, termination determination and fault tolerance. The termination 
implementation is already in a good shape in my opinion and you can find it 
[here](https://github.com/senorcarbone/flink/pull/2#pullrequestreview-1929918) 
so you want to take an early look. The description in the FLIP will make clear 
of how this works in detail. 

The FT update for loops will be rebase on top of the loop termination fix.
We hope that you will find this good too and btw thanks for your patience :)


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15258904#comment-15258904
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-214883298
  
ok good to know @uce! Let me get back to it in a couple of weeks and make 
it right, now it is a bit impossible to find time.


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15215789#comment-15215789
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-202818056
  
Thanks @StephanEwen and @uce for looking into it! I really appreciate it. 
How about the following:

1. I update this PR with the patch that uses ListState and apply some nice 
refactorings Gyula made
2. I will also address all your comments and then merge this to master
3. We start working on perfecting stream finalization on loops and 
backpressure deadlock elimination in seperate PRs right away. These are 
different problems and we need to address them separately, in my view of course.


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210494#comment-15210494
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-200909639
  
The core idea of this is very good, also the illustration is very nice.

After an offline chat with @senorcarbone, we concluded that a remaining 
problem in this is currently the way it integrates with the timeout-based 
termination detection.

Which brings us to the point that we should (in my opinion) change the way 
that loops terminate. It should probably be based on end-of-stream events, to 
make it deterministic and not susceptible to delays.

Question is now, does it make sense to do the termination change first, and 
base this on top of it, or to merge this irrespective of that...


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210328#comment-15210328
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r57327419
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -17,43 +17,80 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.types.Either;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
 @Internal
-public class StreamIterationHead extends OneInputStreamTask 
{
+public class StreamIterationHead extends OneInputStreamTask {
 
private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
 
private volatile boolean running = true;
 
-   // 

-   
+   /**
+* A flag that is on during the duration of a checkpoint. While 
onSnapshot is true the iteration head has to perform
+* upstream backup of all records in transit within the loop.
+*/
+   private volatile boolean onSnapshot = false;
--- End diff --

Always accessed in lock scope, no need for `volatile`


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210279#comment-15210279
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r57321347
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -450,112 +450,121 @@ else if (operator != null) {
}
}
 
-   @Override
-   public boolean triggerCheckpoint(final long checkpointId, final long 
timestamp) throws Exception {
-   LOG.debug("Starting checkpoint {} on task {}", checkpointId, 
getName());
-   
-   synchronized (lock) {
-   if (isRunning) {
-
-   // since both state checkpointing and 
downstream barrier emission occurs in this
-   // lock scope, they are an atomic operation 
regardless of the order in which they occur
-   // we immediately emit the checkpoint barriers, 
so the downstream operators can start
-   // their checkpoint work as soon as possible
-   
operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
-   
-   // now draw the state snapshot
-   final StreamOperator[] allOperators = 
operatorChain.getAllOperators();
-   final StreamTaskState[] states = new 
StreamTaskState[allOperators.length];
+   /**
+* Checkpoints all operator states of the current StreamTask. 
+* Thread-safety must be handled outside the scope of this function
+*/
+   protected boolean checkpointStatesInternal(final long checkpointId, 
long timestamp) throws Exception {
--- End diff --

When rebasing we have to double check that nothing changed in this method 
when calling from `triggerCheckpoint` etc.


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210270#comment-15210270
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r57320231
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -450,112 +450,121 @@ else if (operator != null) {
}
}
 
-   @Override
-   public boolean triggerCheckpoint(final long checkpointId, final long 
timestamp) throws Exception {
-   LOG.debug("Starting checkpoint {} on task {}", checkpointId, 
getName());
-   
-   synchronized (lock) {
-   if (isRunning) {
-
-   // since both state checkpointing and 
downstream barrier emission occurs in this
-   // lock scope, they are an atomic operation 
regardless of the order in which they occur
-   // we immediately emit the checkpoint barriers, 
so the downstream operators can start
-   // their checkpoint work as soon as possible
-   
operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
-   
-   // now draw the state snapshot
-   final StreamOperator[] allOperators = 
operatorChain.getAllOperators();
-   final StreamTaskState[] states = new 
StreamTaskState[allOperators.length];
+   /**
+* Checkpoints all operator states of the current StreamTask. 
+* Thread-safety must be handled outside the scope of this function
+*/
+   protected boolean checkpointStatesInternal(final long checkpointId, 
long timestamp) throws Exception {
--- End diff --

Regarding the JavaDocs:
- The idomiatic style is to have a short description and then a blank line 
for more details (the first line will be displayed as a summary in the IDE etc.)
- The `of the current StreamTask` is clear from context
- The Thread-safety part should be more explicit, for instance `The caller 
has to make sure to call this method in scope of the task's checkpoint lock`.


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210263#comment-15210263
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r57319737
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -450,112 +450,121 @@ else if (operator != null) {
}
}
 
-   @Override
-   public boolean triggerCheckpoint(final long checkpointId, final long 
timestamp) throws Exception {
-   LOG.debug("Starting checkpoint {} on task {}", checkpointId, 
getName());
-   
-   synchronized (lock) {
-   if (isRunning) {
-
-   // since both state checkpointing and 
downstream barrier emission occurs in this
-   // lock scope, they are an atomic operation 
regardless of the order in which they occur
-   // we immediately emit the checkpoint barriers, 
so the downstream operators can start
-   // their checkpoint work as soon as possible
-   
operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
-   
-   // now draw the state snapshot
-   final StreamOperator[] allOperators = 
operatorChain.getAllOperators();
-   final StreamTaskState[] states = new 
StreamTaskState[allOperators.length];
+   /**
+* Checkpoints all operator states of the current StreamTask. 
+* Thread-safety must be handled outside the scope of this function
+*/
+   protected boolean checkpointStatesInternal(final long checkpointId, 
long timestamp) throws Exception {
--- End diff --

What about naming this as in the comments `drawStateSnapshot`? That it is 
internal is more or less communicated by the fact that it is a `protected` 
method.


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-02-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15172094#comment-15172094
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-190279441
  
You can find an alternative version using `ListState` in the following 
branch:
https://github.com/senorcarbone/flink/commits/ftloopsalt
So I noticed that this version is quite **slower** than the one with custom 
operator state but it can support larger states apparently.

I am (ab)using the PartitionedState to store the ListState in the same key, 
as @gyfora suggested since it is the only way to obtain the nice 
representations at the moment. It would be nice to have them available for 
operator state snapshots as well - @aljoscha have you thought about it? When 
there is free time (after the release) it would be nice to see what @aljoscha 
and @StephanEwen think of the two takes as well. No hurries, just take a look 
when you have time!

The two annoying issues I noticed during testing and we need to check soon 
are the following:

- The overhead of transmitting and finally delivering a barrier from the 
`head` to its consumers increases in time (for each subsequent checkpoint). 
That is due to having a single queue at the beginning of the iterative part of 
the job. Events coming from the backedge are pushed further behind the input 
queue.  It would be nice to have take events in round robin among the two input 
gates (iteration source, regular input). Otherwise, checkpoints in iterative 
jobs can be really prolonged in time due to this.

- We need a proper way to deal with deadlocks. I removed the part where we 
discard events in the tail upon timeout since that boils down to at most once 
semantics. This PR is not solving deadlocks but I think we should find a 
graceful way to tackle them. (@uce, any ideas? )


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159365#comment-15159365
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-187827823
  
I can't really add anything to the timeout question :D 

As for the snapshotting. I would go with the ListState as that potentially 
provides very efficient implementation for this logging phase. (It is actually 
designed to handle this) Then we don't have to worry about spilling there 
either.


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159353#comment-15159353
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-187825999
  
Thanks @tillrohrmann for the feedback!
I merged `ForwardingOneInputTask` into `StreamIterationSink`.

One more think I missed pointing out is that when the iteration timeout 
occurs the `StreamIterationHead` flushes its log to its output before 
terminating. An alternative take would be to delay termination until the 
barrier arrives back to complete the snapshot. The problem with that version, 
even though it's correct, is that under frequent checkpoint intervals the 
iteration head could potentially never terminate because it will always be in 
snapshotting state when the timeout occurs.

Regarding the state snapshotting, should I switch to using the ListState 
representation instead or does it make no difference for the time being?


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15154090#comment-15154090
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-186174994
  
Great idea @senorcarbone. I also really like it :-)

I agree with @gyfora to include the logic of the 
`ForwardingOneInputStreamTask` in the `StreamIterationTail`.

For the spillable state we can only use the `RocksDBStateBackend` at the 
moment. But I think that is fine for a first iteration. Eventually, we'll also 
add managed memory support for the memory based state backends which will make 
them equally suitable for the task.


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15154030#comment-15154030
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-186147597
  
Thanks for going through it Gyula!

I agree, the `ForwardingOneInputStreamTask` can be integrated in the 
`StreamIterationTail`, I will probably do that. I just wanted to get the code 
look less messy in the tail part but you are right.

About the operators, the `RecordPusher` was already there (even though it 
was not initialised correctly). I just added the `UpstreamLogger` because I 
wanted it to follow the operator lifecycle. I think the way it is we do not 
need to override the `checkpointStatesInternal` to do the changes we discuss. 
We just need to change the operator callback method and this could also be more 
robust to changes in the StreamTask and operator interfaces and default 
behavior, just my personal view but I see your point too.

I agree with the spill buffer logic. I am only confused a bit with the 
output stream thing (the other part of the problem), is there something already 
I can use? I haven't check recent changes. How does this work if we use 
in-memory backend for example? The blob with all the messages will be anyway 
packaged and sent within the stateHandle to the job manager in a single message 
(potentially being over the limits), even if we use a stream API, or?


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15154001#comment-15154001
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-186142276
  
Thanks Paris, I like the idea, it's a correct modification of the original 
algorithm to make it much easier to implement on Flink for the price of 
buffering more records. 

I have some comments to the implementation:

- Why did you introduce the ForwardingOneInputStreamTask if only the 
Iteration tail will implement this behaviour? I am not sure if other 
StreamTasks will do this in the future, so it might be better to just put the 
logic in the StreamIterationTail instead of adding a new abstraction
- I think the RecordPusher and UpstreamLogger embedded operators on the 
head and tail tasks are not really necessary and just add additional complexity 
to debugging and understanding the code. The Upstream logger only makes 
checkpointing "easier" for the implementer but we probably want to do some 
custom logic there anyways. So I would probably just overwrite the 
checkpointStatesInternal method directly.

I agree that we definitely need to spill the buffered records to disk or 
use managed memory for this. This can be similar to the BarrierBuffer logic. We 
can combine this with checkpointing to an output stream to never have to 
materialize the full state.


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-02-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15152692#comment-15152692
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

GitHub user senorcarbone opened a pull request:

https://github.com/apache/flink/pull/1668

[FLINK-3257] Add Exactly-Once Processing Guarantees for Iterative 
DataStream Jobs

# **[WIP]**
This is a first version of the adapted snapshot algorithm to support 
iterations. It is correct and works in practice...well, when memory capacity is 
enough for its logging requirements but I am working on that, hopefully with a 
little help from you. Before we go into the implementation details let me 
describe briefly the new algorithm.

## Algorithm

Our existing checkpoint algorithm has a very fluid and straightforward 
protocol. It just makes sure that all checkpoint barriers are aligned in each 
operator so that all records before barriers (pre-shot) are processed before 
taking a snapshot. Since waiting indefinitely for all records in-transit within 
a cycle of an execution graph can violate termination (crucial liveness 
property) we have to...save any unprocessed records for later during the 
snapshot. In this take of the algorithm on Flink we assign that role to the 
`Iteration Head`. The steps this version varies from the vanilla algorithm are 
simply the following:

1. An `Iteration Head` receives a barrier from the system runtime (as 
before) and:

-  Goes into **Logging Mode**. That means that from that point on every 
record it receives from its `Iteration Sink` is buffered in its own operator 
state and **not** forwarded further until it goes back to normal mode.
- **Forwards** the barrier to its downstream nodes (this guarantees 
liveness, otherwise we have a deadlock).

2. Eventually, the `Iteration Head` receives
 

## Example

blabla


![ftloops-topology](https://cloud.githubusercontent.com/assets/858078/13151679/7f150538-d66b-11e5-98c8-7bbe2243b810.png)


blabla


![diagram](https://cloud.githubusercontent.com/assets/858078/13151664/7361a638-d66b-11e5-94e9-64f70a8130d7.png)

## Current Implementation Details

## Open/Pending Issues




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/senorcarbone/flink ftloops

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1668.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1668


commit 38256e4c4bb00183794699027e8e4298787c66fa
Author: Paris Carbone 
Date:   2016-01-19T10:18:54Z

exactly-once processing test for stream iterations

commit dbf2625536289dc70724ef798fd02989e586d874
Author: Paris Carbone 
Date:   2016-02-18T15:49:19Z

[wip] adapt snapshot mechanism for iterative jobs




> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-01-18 Thread Paris Carbone (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105997#comment-15105997
 ] 

Paris Carbone commented on FLINK-3257:
--

Hey Marton. Sorry but no. I am very actively working on it and it is the very 
core part of my work so it would be a waste to give the task to someone else. 
I'd suggest you give the master student something else if it's ok. 

> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-01-18 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15106006#comment-15106006
 ] 

Márton Balassi commented on FLINK-3257:
---

No problem. It is great that you are implementing the feature. It seems a bit 
nasty as it goes through many layers of the code. Good luck with it!

> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-01-18 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105829#comment-15105829
 ] 

Márton Balassi commented on FLINK-3257:
---

Hey Paris, how urgent is this feature for you? I have a master student who 
would love to give this implementation a try if it is not an immediate task on 
your side.

> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)