[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...
Github user senorcarbone commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-214883298 ok good to know @uce! Let me get back to it in a couple of weeks and make it right, now it is a bit impossible to find time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...
Github user senorcarbone commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-202818056 Thanks @StephanEwen and @uce for looking into it! I really appreciate it. How about the following: 1. I update this PR with the patch that uses ListState and apply some nice refactorings Gyula made 2. I will also address all your comments and then merge this to master 3. We start working on perfecting stream finalization on loops and backpressure deadlock elimination in seperate PRs right away. These are different problems and we need to address them separately, in my view of course. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-200909639 The core idea of this is very good, also the illustration is very nice. After an offline chat with @senorcarbone, we concluded that a remaining problem in this is currently the way it integrates with the timeout-based termination detection. Which brings us to the point that we should (in my opinion) change the way that loops terminate. It should probably be based on end-of-stream events, to make it deterministic and not susceptible to delays. Question is now, does it make sense to do the termination change first, and base this on top of it, or to merge this irrespective of that... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r57327419 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -17,43 +17,80 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; +import org.apache.flink.types.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + @Internal -public class StreamIterationHead extends OneInputStreamTask{ +public class StreamIterationHead extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); private volatile boolean running = true; - // - + /** +* A flag that is on during the duration of a checkpoint. While onSnapshot is true the iteration head has to perform +* upstream backup of all records in transit within the loop. +*/ + private volatile boolean onSnapshot = false; --- End diff -- Always accessed in lock scope, no need for `volatile` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r57321347 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -450,112 +450,121 @@ else if (operator != null) { } } - @Override - public boolean triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception { - LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); - - synchronized (lock) { - if (isRunning) { - - // since both state checkpointing and downstream barrier emission occurs in this - // lock scope, they are an atomic operation regardless of the order in which they occur - // we immediately emit the checkpoint barriers, so the downstream operators can start - // their checkpoint work as soon as possible - operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp); - - // now draw the state snapshot - final StreamOperator[] allOperators = operatorChain.getAllOperators(); - final StreamTaskState[] states = new StreamTaskState[allOperators.length]; + /** +* Checkpoints all operator states of the current StreamTask. +* Thread-safety must be handled outside the scope of this function +*/ + protected boolean checkpointStatesInternal(final long checkpointId, long timestamp) throws Exception { --- End diff -- When rebasing we have to double check that nothing changed in this method when calling from `triggerCheckpoint` etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r57320231 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -450,112 +450,121 @@ else if (operator != null) { } } - @Override - public boolean triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception { - LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); - - synchronized (lock) { - if (isRunning) { - - // since both state checkpointing and downstream barrier emission occurs in this - // lock scope, they are an atomic operation regardless of the order in which they occur - // we immediately emit the checkpoint barriers, so the downstream operators can start - // their checkpoint work as soon as possible - operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp); - - // now draw the state snapshot - final StreamOperator[] allOperators = operatorChain.getAllOperators(); - final StreamTaskState[] states = new StreamTaskState[allOperators.length]; + /** +* Checkpoints all operator states of the current StreamTask. +* Thread-safety must be handled outside the scope of this function +*/ + protected boolean checkpointStatesInternal(final long checkpointId, long timestamp) throws Exception { --- End diff -- Regarding the JavaDocs: - The idomiatic style is to have a short description and then a blank line for more details (the first line will be displayed as a summary in the IDE etc.) - The `of the current StreamTask` is clear from context - The Thread-safety part should be more explicit, for instance `The caller has to make sure to call this method in scope of the task's checkpoint lock`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r57319737 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -450,112 +450,121 @@ else if (operator != null) { } } - @Override - public boolean triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception { - LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); - - synchronized (lock) { - if (isRunning) { - - // since both state checkpointing and downstream barrier emission occurs in this - // lock scope, they are an atomic operation regardless of the order in which they occur - // we immediately emit the checkpoint barriers, so the downstream operators can start - // their checkpoint work as soon as possible - operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp); - - // now draw the state snapshot - final StreamOperator[] allOperators = operatorChain.getAllOperators(); - final StreamTaskState[] states = new StreamTaskState[allOperators.length]; + /** +* Checkpoints all operator states of the current StreamTask. +* Thread-safety must be handled outside the scope of this function +*/ + protected boolean checkpointStatesInternal(final long checkpointId, long timestamp) throws Exception { --- End diff -- What about naming this as in the comments `drawStateSnapshot`? That it is internal is more or less communicated by the fact that it is a `protected` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...
Github user senorcarbone commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-190279441 You can find an alternative version using `ListState` in the following branch: https://github.com/senorcarbone/flink/commits/ftloopsalt So I noticed that this version is quite **slower** than the one with custom operator state but it can support larger states apparently. I am (ab)using the PartitionedState to store the ListState in the same key, as @gyfora suggested since it is the only way to obtain the nice representations at the moment. It would be nice to have them available for operator state snapshots as well - @aljoscha have you thought about it? When there is free time (after the release) it would be nice to see what @aljoscha and @StephanEwen think of the two takes as well. No hurries, just take a look when you have time! The two annoying issues I noticed during testing and we need to check soon are the following: - The overhead of transmitting and finally delivering a barrier from the `head` to its consumers increases in time (for each subsequent checkpoint). That is due to having a single queue at the beginning of the iterative part of the job. Events coming from the backedge are pushed further behind the input queue. It would be nice to have take events in round robin among the two input gates (iteration source, regular input). Otherwise, checkpoints in iterative jobs can be really prolonged in time due to this. - We need a proper way to deal with deadlocks. I removed the part where we discard events in the tail upon timeout since that boils down to at most once semantics. This PR is not solving deadlocks but I think we should find a graceful way to tackle them. (@uce, any ideas? ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-187827823 I can't really add anything to the timeout question :D As for the snapshotting. I would go with the ListState as that potentially provides very efficient implementation for this logging phase. (It is actually designed to handle this) Then we don't have to worry about spilling there either. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...
Github user senorcarbone commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-187825999 Thanks @tillrohrmann for the feedback! I merged `ForwardingOneInputTask` into `StreamIterationSink`. One more think I missed pointing out is that when the iteration timeout occurs the `StreamIterationHead` flushes its log to its output before terminating. An alternative take would be to delay termination until the barrier arrives back to complete the snapshot. The problem with that version, even though it's correct, is that under frequent checkpoint intervals the iteration head could potentially never terminate because it will always be in snapshotting state when the timeout occurs. Regarding the state snapshotting, should I switch to using the ListState representation instead or does it make no difference for the time being? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-186174994 Great idea @senorcarbone. I also really like it :-) I agree with @gyfora to include the logic of the `ForwardingOneInputStreamTask` in the `StreamIterationTail`. For the spillable state we can only use the `RocksDBStateBackend` at the moment. But I think that is fine for a first iteration. Eventually, we'll also add managed memory support for the memory based state backends which will make them equally suitable for the task. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1668#issuecomment-186142276 Thanks Paris, I like the idea, it's a correct modification of the original algorithm to make it much easier to implement on Flink for the price of buffering more records. I have some comments to the implementation: - Why did you introduce the ForwardingOneInputStreamTask if only the Iteration tail will implement this behaviour? I am not sure if other StreamTasks will do this in the future, so it might be better to just put the logic in the StreamIterationTail instead of adding a new abstraction - I think the RecordPusher and UpstreamLogger embedded operators on the head and tail tasks are not really necessary and just add additional complexity to debugging and understanding the code. The Upstream logger only makes checkpointing "easier" for the implementer but we probably want to do some custom logic there anyways. So I would probably just overwrite the checkpointStatesInternal method directly. I agree that we definitely need to spill the buffered records to disk or use managed memory for this. This can be similar to the BarrierBuffer logic. We can combine this with checkpointing to an output stream to never have to materialize the full state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3257] Add Exactly-Once Processing Guara...
GitHub user senorcarbone opened a pull request: https://github.com/apache/flink/pull/1668 [FLINK-3257] Add Exactly-Once Processing Guarantees for Iterative DataStream Jobs # **[WIP]** This is a first version of the adapted snapshot algorithm to support iterations. It is correct and works in practice...well, when memory capacity is enough for its logging requirements but I am working on that, hopefully with a little help from you. Before we go into the implementation details let me describe briefly the new algorithm. ## Algorithm Our existing checkpoint algorithm has a very fluid and straightforward protocol. It just makes sure that all checkpoint barriers are aligned in each operator so that all records before barriers (pre-shot) are processed before taking a snapshot. Since waiting indefinitely for all records in-transit within a cycle of an execution graph can violate termination (crucial liveness property) we have to...save any unprocessed records for later during the snapshot. In this take of the algorithm on Flink we assign that role to the `Iteration Head`. The steps this version varies from the vanilla algorithm are simply the following: 1. An `Iteration Head` receives a barrier from the system runtime (as before) and: - Goes into **Logging Mode**. That means that from that point on every record it receives from its `Iteration Sink` is buffered in its own operator state and **not** forwarded further until it goes back to normal mode. - **Forwards** the barrier to its downstream nodes (this guarantees liveness, otherwise we have a deadlock). 2. Eventually, the `Iteration Head` receives ## Example blabla ![ftloops-topology](https://cloud.githubusercontent.com/assets/858078/13151679/7f150538-d66b-11e5-98c8-7bbe2243b810.png) blabla ![diagram](https://cloud.githubusercontent.com/assets/858078/13151664/7361a638-d66b-11e5-94e9-64f70a8130d7.png) ## Current Implementation Details ## Open/Pending Issues You can merge this pull request into a Git repository by running: $ git pull https://github.com/senorcarbone/flink ftloops Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1668.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1668 commit 38256e4c4bb00183794699027e8e4298787c66fa Author: Paris CarboneDate: 2016-01-19T10:18:54Z exactly-once processing test for stream iterations commit dbf2625536289dc70724ef798fd02989e586d874 Author: Paris Carbone Date: 2016-02-18T15:49:19Z [wip] adapt snapshot mechanism for iterative jobs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---