[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...

2016-04-26 Thread senorcarbone
Github user senorcarbone commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-214883298
  
ok good to know @uce! Let me get back to it in a couple of weeks and make 
it right, now it is a bit impossible to find time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...

2016-03-29 Thread senorcarbone
Github user senorcarbone commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-202818056
  
Thanks @StephanEwen and @uce for looking into it! I really appreciate it. 
How about the following:

1. I update this PR with the patch that uses ListState and apply some nice 
refactorings Gyula made
2. I will also address all your comments and then merge this to master
3. We start working on perfecting stream finalization on loops and 
backpressure deadlock elimination in seperate PRs right away. These are 
different problems and we need to address them separately, in my view of course.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...

2016-03-24 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-200909639
  
The core idea of this is very good, also the illustration is very nice.

After an offline chat with @senorcarbone, we concluded that a remaining 
problem in this is currently the way it integrates with the timeout-based 
termination detection.

Which brings us to the point that we should (in my opinion) change the way 
that loops terminate. It should probably be based on end-of-stream events, to 
make it deterministic and not susceptible to delays.

Question is now, does it make sense to do the termination change first, and 
base this on top of it, or to merge this irrespective of that...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...

2016-03-24 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r57327419
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -17,43 +17,80 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.types.Either;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
 @Internal
-public class StreamIterationHead extends OneInputStreamTask 
{
+public class StreamIterationHead extends OneInputStreamTask {
 
private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
 
private volatile boolean running = true;
 
-   // 

-   
+   /**
+* A flag that is on during the duration of a checkpoint. While 
onSnapshot is true the iteration head has to perform
+* upstream backup of all records in transit within the loop.
+*/
+   private volatile boolean onSnapshot = false;
--- End diff --

Always accessed in lock scope, no need for `volatile`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...

2016-03-24 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r57321347
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -450,112 +450,121 @@ else if (operator != null) {
}
}
 
-   @Override
-   public boolean triggerCheckpoint(final long checkpointId, final long 
timestamp) throws Exception {
-   LOG.debug("Starting checkpoint {} on task {}", checkpointId, 
getName());
-   
-   synchronized (lock) {
-   if (isRunning) {
-
-   // since both state checkpointing and 
downstream barrier emission occurs in this
-   // lock scope, they are an atomic operation 
regardless of the order in which they occur
-   // we immediately emit the checkpoint barriers, 
so the downstream operators can start
-   // their checkpoint work as soon as possible
-   
operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
-   
-   // now draw the state snapshot
-   final StreamOperator[] allOperators = 
operatorChain.getAllOperators();
-   final StreamTaskState[] states = new 
StreamTaskState[allOperators.length];
+   /**
+* Checkpoints all operator states of the current StreamTask. 
+* Thread-safety must be handled outside the scope of this function
+*/
+   protected boolean checkpointStatesInternal(final long checkpointId, 
long timestamp) throws Exception {
--- End diff --

When rebasing we have to double check that nothing changed in this method 
when calling from `triggerCheckpoint` etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...

2016-03-24 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r57320231
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -450,112 +450,121 @@ else if (operator != null) {
}
}
 
-   @Override
-   public boolean triggerCheckpoint(final long checkpointId, final long 
timestamp) throws Exception {
-   LOG.debug("Starting checkpoint {} on task {}", checkpointId, 
getName());
-   
-   synchronized (lock) {
-   if (isRunning) {
-
-   // since both state checkpointing and 
downstream barrier emission occurs in this
-   // lock scope, they are an atomic operation 
regardless of the order in which they occur
-   // we immediately emit the checkpoint barriers, 
so the downstream operators can start
-   // their checkpoint work as soon as possible
-   
operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
-   
-   // now draw the state snapshot
-   final StreamOperator[] allOperators = 
operatorChain.getAllOperators();
-   final StreamTaskState[] states = new 
StreamTaskState[allOperators.length];
+   /**
+* Checkpoints all operator states of the current StreamTask. 
+* Thread-safety must be handled outside the scope of this function
+*/
+   protected boolean checkpointStatesInternal(final long checkpointId, 
long timestamp) throws Exception {
--- End diff --

Regarding the JavaDocs:
- The idomiatic style is to have a short description and then a blank line 
for more details (the first line will be displayed as a summary in the IDE etc.)
- The `of the current StreamTask` is clear from context
- The Thread-safety part should be more explicit, for instance `The caller 
has to make sure to call this method in scope of the task's checkpoint lock`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...

2016-03-24 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r57319737
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -450,112 +450,121 @@ else if (operator != null) {
}
}
 
-   @Override
-   public boolean triggerCheckpoint(final long checkpointId, final long 
timestamp) throws Exception {
-   LOG.debug("Starting checkpoint {} on task {}", checkpointId, 
getName());
-   
-   synchronized (lock) {
-   if (isRunning) {
-
-   // since both state checkpointing and 
downstream barrier emission occurs in this
-   // lock scope, they are an atomic operation 
regardless of the order in which they occur
-   // we immediately emit the checkpoint barriers, 
so the downstream operators can start
-   // their checkpoint work as soon as possible
-   
operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
-   
-   // now draw the state snapshot
-   final StreamOperator[] allOperators = 
operatorChain.getAllOperators();
-   final StreamTaskState[] states = new 
StreamTaskState[allOperators.length];
+   /**
+* Checkpoints all operator states of the current StreamTask. 
+* Thread-safety must be handled outside the scope of this function
+*/
+   protected boolean checkpointStatesInternal(final long checkpointId, 
long timestamp) throws Exception {
--- End diff --

What about naming this as in the comments `drawStateSnapshot`? That it is 
internal is more or less communicated by the fact that it is a `protected` 
method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...

2016-02-29 Thread senorcarbone
Github user senorcarbone commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-190279441
  
You can find an alternative version using `ListState` in the following 
branch:
https://github.com/senorcarbone/flink/commits/ftloopsalt
So I noticed that this version is quite **slower** than the one with custom 
operator state but it can support larger states apparently.

I am (ab)using the PartitionedState to store the ListState in the same key, 
as @gyfora suggested since it is the only way to obtain the nice 
representations at the moment. It would be nice to have them available for 
operator state snapshots as well - @aljoscha have you thought about it? When 
there is free time (after the release) it would be nice to see what @aljoscha 
and @StephanEwen think of the two takes as well. No hurries, just take a look 
when you have time!

The two annoying issues I noticed during testing and we need to check soon 
are the following:

- The overhead of transmitting and finally delivering a barrier from the 
`head` to its consumers increases in time (for each subsequent checkpoint). 
That is due to having a single queue at the beginning of the iterative part of 
the job. Events coming from the backedge are pushed further behind the input 
queue.  It would be nice to have take events in round robin among the two input 
gates (iteration source, regular input). Otherwise, checkpoints in iterative 
jobs can be really prolonged in time due to this.

- We need a proper way to deal with deadlocks. I removed the part where we 
discard events in the tail upon timeout since that boils down to at most once 
semantics. This PR is not solving deadlocks but I think we should find a 
graceful way to tackle them. (@uce, any ideas? )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...

2016-02-23 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-187827823
  
I can't really add anything to the timeout question :D 

As for the snapshotting. I would go with the ListState as that potentially 
provides very efficient implementation for this logging phase. (It is actually 
designed to handle this) Then we don't have to worry about spilling there 
either.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...

2016-02-23 Thread senorcarbone
Github user senorcarbone commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-187825999
  
Thanks @tillrohrmann for the feedback!
I merged `ForwardingOneInputTask` into `StreamIterationSink`.

One more think I missed pointing out is that when the iteration timeout 
occurs the `StreamIterationHead` flushes its log to its output before 
terminating. An alternative take would be to delay termination until the 
barrier arrives back to complete the snapshot. The problem with that version, 
even though it's correct, is that under frequent checkpoint intervals the 
iteration head could potentially never terminate because it will always be in 
snapshotting state when the timeout occurs.

Regarding the state snapshotting, should I switch to using the ListState 
representation instead or does it make no difference for the time being?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...

2016-02-19 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-186174994
  
Great idea @senorcarbone. I also really like it :-)

I agree with @gyfora to include the logic of the 
`ForwardingOneInputStreamTask` in the `StreamIterationTail`.

For the spillable state we can only use the `RocksDBStateBackend` at the 
moment. But I think that is fine for a first iteration. Eventually, we'll also 
add managed memory support for the memory based state backends which will make 
them equally suitable for the task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...

2016-02-19 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1668#issuecomment-186142276
  
Thanks Paris, I like the idea, it's a correct modification of the original 
algorithm to make it much easier to implement on Flink for the price of 
buffering more records. 

I have some comments to the implementation:

- Why did you introduce the ForwardingOneInputStreamTask if only the 
Iteration tail will implement this behaviour? I am not sure if other 
StreamTasks will do this in the future, so it might be better to just put the 
logic in the StreamIterationTail instead of adding a new abstraction
- I think the RecordPusher and UpstreamLogger embedded operators on the 
head and tail tasks are not really necessary and just add additional complexity 
to debugging and understanding the code. The Upstream logger only makes 
checkpointing "easier" for the implementer but we probably want to do some 
custom logic there anyways. So I would probably just overwrite the 
checkpointStatesInternal method directly.

I agree that we definitely need to spill the buffered records to disk or 
use managed memory for this. This can be similar to the BarrierBuffer logic. We 
can combine this with checkpointing to an output stream to never have to 
materialize the full state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...

2016-02-18 Thread senorcarbone
GitHub user senorcarbone opened a pull request:

https://github.com/apache/flink/pull/1668

[FLINK-3257] Add Exactly-Once Processing Guarantees for Iterative 
DataStream Jobs

# **[WIP]**
This is a first version of the adapted snapshot algorithm to support 
iterations. It is correct and works in practice...well, when memory capacity is 
enough for its logging requirements but I am working on that, hopefully with a 
little help from you. Before we go into the implementation details let me 
describe briefly the new algorithm.

## Algorithm

Our existing checkpoint algorithm has a very fluid and straightforward 
protocol. It just makes sure that all checkpoint barriers are aligned in each 
operator so that all records before barriers (pre-shot) are processed before 
taking a snapshot. Since waiting indefinitely for all records in-transit within 
a cycle of an execution graph can violate termination (crucial liveness 
property) we have to...save any unprocessed records for later during the 
snapshot. In this take of the algorithm on Flink we assign that role to the 
`Iteration Head`. The steps this version varies from the vanilla algorithm are 
simply the following:

1. An `Iteration Head` receives a barrier from the system runtime (as 
before) and:

-  Goes into **Logging Mode**. That means that from that point on every 
record it receives from its `Iteration Sink` is buffered in its own operator 
state and **not** forwarded further until it goes back to normal mode.
- **Forwards** the barrier to its downstream nodes (this guarantees 
liveness, otherwise we have a deadlock).

2. Eventually, the `Iteration Head` receives
 

## Example

blabla


![ftloops-topology](https://cloud.githubusercontent.com/assets/858078/13151679/7f150538-d66b-11e5-98c8-7bbe2243b810.png)


blabla


![diagram](https://cloud.githubusercontent.com/assets/858078/13151664/7361a638-d66b-11e5-94e9-64f70a8130d7.png)

## Current Implementation Details

## Open/Pending Issues




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/senorcarbone/flink ftloops

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1668.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1668


commit 38256e4c4bb00183794699027e8e4298787c66fa
Author: Paris Carbone 
Date:   2016-01-19T10:18:54Z

exactly-once processing test for stream iterations

commit dbf2625536289dc70724ef798fd02989e586d874
Author: Paris Carbone 
Date:   2016-02-18T15:49:19Z

[wip] adapt snapshot mechanism for iterative jobs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---