[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 well..major runtime changes are coming with FLIP-6, 15 and 16 so I would suggest you watch those. Loop FT will be included in one of these along with other loop redesign features. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user dikei commented on the issue: https://github.com/apache/flink/pull/1668 Hi. Do we have any updates on this :) ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 sweet! thanks @StefanRRichter --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/1668 For raw operator state, override `AbstractStreamOperator::snapshotState(StateSnapshotContext context)` inside your operator. Your implementation calls to super, then it can obtain the raw stream via `context.getRawOperatorStateOutput()`. This stream works like a normal output stream, except that you can also call `stream.startNewPartition()`. This signals that a partition is started and previous partitions are finalized/immutable. Partitions are the atomic units of state redistribution, think of them as the indiviual elements in a `ListCheckpointed` state. For restoring, override `AbstractStreamOperator::initializeState(StateInitializationContext context)`. After calling super, `context.getRawOperatorStateInputs()` provides an iterable with one input stream per partition that your operator should restore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Thanks for the review @gyfora and @StephanEwen , these are very good points. @StephanEwen makes sense to not really index/keep metadata of individual records in log slices, it is extra overhead. Writing raw operator state makes sense, so I will do that once @StefanRRichter gives me some pointers, that would be great. Any redistribution of the checkpoint slices would violate causality so I hope the "list redistribution pattern" actually keeps the set of registered operator states per instance intact. The garbage collection issue still remains but maybe (if @StefanRRichter approves) I can add an `unregister` functionality to the `OperatorStateStore`. I can also add preconfigured operators (not that they will be reused anywhere). It is more clean but I really need to see how can I get full control of the `task` checkpointing behaviour from the `operator` level (since the default task checkpointing behaviour is altered at the task-level). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Hey. Any update/opinion/something anyone? Just a gentle reminder, sorry if this sounds a bit desperate :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 I have just rebased to the current master. Please let me wrap this up. It has been over a year :) Unregistering state in the OperatorStateStore is very tiny fix. @StephanEwen @StefanRRichter Is it ok with you to make this small addition in this PR or should I create a separate issue? Hope you have a spare few minutes to take a quick look this time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Hey @addisonj. Sure! You could perhaps review the changes and maybe see how to discard empty operator states if you are motivated. This is the only pending issue for this PR. thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user addisonj commented on the issue: https://github.com/apache/flink/pull/1668 Very interested in this work. It sounds like there are few loose ends and then some cleanup before it might be ready for merge, @senorcarbone or @StephanEwen anything that can be supported by someone else? Would love to help wherever possible --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 The last update implements a variant of what @StephanEwen proposes. We have put some more thought on this offline too thanks to @gyfora ! The idea is that instead of putting records to each `ListState`, the output log is partitioned into multiple log "slices", one per concurrent checkpoint. More specifically, the `UpstreamLogger` operator at the `IterationHead` slices logs proportionally to the number of concurrent snapshots. This also allows committed output logs to be uniquely identified and cleared after each complete checkpoint. The design is based on the following assumptions: - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that gives a singular view of the log. Before I polish this we need to close a memory leak. The `clear` operation of `State` cleans the state under the registered id but it does not seem to unregister the key itself. Does anyone have an idea on how to unregister state properly? Hope this gets some attention to wrap it up, it's been too long :). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 @StephanEwen could you check my question above? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Working on it atm . I decided to make the following optimisations but want to very quickly make sure that async checkpointing works the way I believe it does: - Most importantly, I am changing the iteration head to always forward records. Their effects are not present in any in-progress snapshot anyway so that I should had done from the very beginning. :) - If `ListState` is checkpointed asynchronously, depending on the backend I suppose, then the current version of it, during the snapshot, will be persisted as a copy, which means that we can apply mutations right away and therefore reset it right after invoking the snapshot to the beginning of the next in-progress snapshot (some indexing involved). That way we do not need to open new ListStates in the first place. Does this make sense? @StephanEwen Please correct me if I am wrong, regarding the second point. I am just not very familiar with async snapshotting for `ListState` (this is not clear in the documentation for me). Mind also that I do not use the `CheckpointedAsychronously` interface, it seems to be heading towards deprecation. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 agreed @StephanEwen! I will do that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/1668 @senorcarbone I agree, let's fix the multiple checkpoints issue and do the rest in FLIP-15 The other operators have a pretty simply way of doing this: - for synchronous checkpointed operators, no need to do anything, the synchronous part of one checkpoint is over when the next starts (because it is synchronous ;-)) - for asynchronously checkpointed state, the state backend needs to be able to hold multiple snapshots, which are saved by multiple background threads - none of the operators deal with in-flight data, which makes their job easy Dealing with in-flight data probably means that you need to open a ListState for each checkpoint that arrives and add the feed back values to each state, until that particular checkpoints barrier comes back through the feedback channel. I think that should be sufficient. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 These are some good points @StephanEwen, thanks for checking it. How about the following, regarding each issue: - `Concurrent Checkpoints`: Looks like an improvement but I can sure do it in this PR if it is a crucial one. Can you elaborate a bit more or point me out to other concurrent checkpointing operator state examples to get an idea of how you want to do it? - `Reconfiguration` : Sounds interesting...but I am not really aware of it from the devlist. If it is simple enough I could add support for it here. Otherwise I would suggest we address this in a seperate JIRA and PR as an improvement. Is there a design document on how we plan to achieve reconfiguration and repartitioning for operator state specifically somewhere? - `At-most-once blocking queue` : It is obvious from my previous comments that I do not approve this part, but that is something we already got rid of in [FLIP-15](https://cwiki.apache.org/confluence/display/FLINK/FLIP-15+Scoped+Loops+and+Job+Termination) already ([this](https://github.com/FouadMA/flink/commit/9adaac435bcaf3552afe564c739d4e8fd79c433b) commit). How about we address this together with the deadlocks in FLIP-15? - `Deadlocks`: I like the elastic spilling channel idea to resolve deadlocks. I need time to dig a bit more into this and make sure we solve deadlocks and not just improve. Is it ok with you if we address that in [FLIP-15](https://cwiki.apache.org/confluence/display/FLINK/FLIP-15+Scoped+Loops+and+Job+Termination)? I need more time for this part, plus, we need to combine the absense of expiring queues with a proper termination algorithm (otherwise we just solve the deadlocks and the jobs never terminate). What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/1668 To suggest some way to fix the guarantees: To my mind, the crux lies in the way that the feedback channel is implemented - a simple blocking queue just does not cut it for that case. To make this proper, I think we need to do the following: - Have an elastic feedback channel (unbounded) with a certain memory budget, that can spill if needed. I think it would be best implemented holding data serialized. - On checkpoint, one simply adds the feedback channel data (already bytes) to the checkpoint - The source task should probably prioritize reading from the feedback channel, to keep it always as small as possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/1668 Thanks for the reminder, I went over the code today. The code looks mostly good, but here are some thoughts: - The head task supports only one concurrent checkpoint. In general, the tasks need to support multiple checkpoints being in progress at the same time. It frequently happens when people trigger savepoints concurrent to a running checkpoint. I think that is important to support. - There tail task offers the elements to the blocking queue. That means records are simply dropped if the capacity bound queue (one element) is not polled by the head task in time. - With the capacity bound in the feedback queue, it is pretty easy to build a full deadlock. Just use a loop function that explodes data into the feedback channel. - Recent code also introduced the ability to change parallelism. What are the semantics here when the parallelism of the loop is changed? Since loops did not support any fault tolerance guarantees, I guess this does improve recovery behavior. But as long as the loops can either deadlock or drop data, the hard guarantees are in the end still a bit weak. So that leaves me a bit ambivalent what to do with this pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Exactly, these two issues do not depend on each other. No doubt loop FT is the first thing that can enable iterations in a production deployment so I would merge that first. Thank you again Gyula for looking into it :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user gyfora commented on the issue: https://github.com/apache/flink/pull/1668 I think the PR looks pretty good, and it sounds fair to to address termination in a later PR as this will still greatly improve the current guarantees without making the backpressure/termination problems any worse. +1 from me --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Hey again, @StephanEwen @uce . When you have 10 min can you take a look to see if this is acceptable? I would not like to leave this here for months again, it has been out way too long already. The changes are just a few and straightforward so I really encourage you to skim them at your earliest convenience. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Ok, so I am progressing this a bit independently from the termination stuff and then we rebase to the first PR that is merged. I just changed everything and rebased to the current master. Some notable changes: - The `StreamIterationCheckpointingITCase` is not made deterministic, it fails after the first successful checkpoint once and the jobs stops after everything has been recovered appropriately. - I am now using ListState which is supposed to work like a charm with the rocksdb file backend. Note that with the default in-memory backend there is a high chance to get issues given the low memory capacity that it is given by default. - One tricky part that can be potentially done better is the way I set the logger in the StreamIterationHead (had to change the head op field access to `protected` in the OperatorChain) Whenever you find time go ahead and check it out. It passes my super-strict test which is a good thing. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 Hey! Good to be back :) . Let's fix this properly, as @StephanEwen recommended it now that there is some time. We are writing together with @FouadMA a FLIP to address major loop fixes. Namely, termination determination and fault tolerance. The termination implementation is already in a good shape in my opinion and you can find it [here](https://github.com/senorcarbone/flink/pull/2#pullrequestreview-1929918) so you want to take an early look. The description in the FLIP will make clear of how this works in detail. The FT update for loops will be rebase on top of the loop termination fix. We hope that you will find this good too and btw thanks for your patience :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---