Hi Ashish, the atomic savepoint with savepoint is going to be implemented for all state backends.
Cheers, Till On Sat, Jun 16, 2018 at 4:29 AM Ashish Pokharel <ashish...@yahoo.com> wrote: > Hi Till, Fabian, > > Thanks for your responses again. > > Till, you have nailed it. I will comment on them individually. But first, > I feel like I am still not stating it well enough to illustrate the need. > May be I am overthinking :) > > So let me try one more time with a preface that we are talking about > millions of sensors reporting logs/metrics. So in my cluster we can > potentially have 10s if not 100s of such apps for variety of data. I > currently have 1 app in Prod so I can do a lot testing :) Just as a test, I > enabled RocksDB State Backend and Checkpointing every 5 seconds with > Graphite metrics enabled. On an average I could see almost 25GB of total > state being written across couple of hundred slots based on Graphite > numbers - it is setup with incremental and async Checkpoints. I am assuming > main reason being states are transient and deltas are essentially entire > set of new states. Our main concern is real-time processing vs no data loss > or even possibly a few duplicates. To Fabian’s point, at least once vs > exactly once semantics are also not of utmost concern at this point. Now, > bottom line is I have Checkpointing disabled and use MemoryStateBackend > with the thought that writing massive states to persistence every few > seconds didn’t seem like best use of resources - I’d rather fit in more of > these apps in cluster and use stateful processing for apps we really need > them on. However, this leads to 2 main issue > > 1- If an operator fails (let’s say Kafka timeouts), entire job graph > restarts which leads us to more than desirable gap of data (lost states > across 100s of operators) as obviously there is no recoverable state > 2- Same issue happens in planned restarts > > Per Fabian’s suggestion, I am going to try RocksDB State Backend with > local drives and run some baseline tests - hoping states are kept in memory > for the most part unless spillover is needed. This should at least allow us > with decent solution of (2). I am still not convinced we should enable > periodic Checkpointing (perhaps I am wrong here but again I have > highlighted my reasons above). > > " > > Just for my own understanding: What do you want to do with event-based > checkpointing triggers (onRestart, onShutdown?). Do you want to draw a > partial snapshot which should then be used for recovery? Technically, this > is possible but I'm not sure how much value this would add for Flink users. > A partial snapshot could also be completely empty (equivalent of disabling > checkpointing). > > > I can see the point of making the checkpoint triggering more flexible and > giving some control to the user. In contrast to savepoints, checkpoints are > considered for recovery. My question here would be, what would be the > triggering condition in your case (other than time)? > > " > I’d think trigger condition would be based on life-cycle hook like RESTART > (or perhaps even an external message when FLINK-6131 is available may be). > Partial (best possible) snapshot is exactly what it would be - states from > failing operators cannot be expected to be recoverable obviously. > > What the community will add very soon is an atomic stop with savepoint > call which will take a savepoint of your job's state when and shut it down. > > > Very nice! Would this also have same need to use Fs or RocksDB State > Backend? It shouldn’t be an issue for us as long as my tests above turn out > to be decent. > > Thanks again guys for your advice and feedback. Really appreciate it. > > — Ashish > > > > On Jun 15, 2018, at 5:43 AM, Till Rohrmann <trohrm...@apache.org> wrote: > > Hi, > > ideally we would not have to cancel all tasks and only redeploy the whole > job in case of a restart. Instead we should do what you've outlined: > Redeploy the failed tasks and reset the state of all other running tasks. > At the moment, this is, however, not yet possible. While improving Flink's > recovery behavior this should be addressed eventually. > > Just for my own understanding: What do you want to do with event-based > checkpointing triggers (onRestart, onShutdown?). Do you want to draw a > partial snapshot which should then be used for recovery? Technically, this > is possible but I'm not sure how much value this would add for Flink users. > A partial snapshot could also be completely empty (equivalent of disabling > checkpointing). > > I can see the point of making the checkpoint triggering more flexible and > giving some control to the user. In contrast to savepoints, checkpoints are > considered for recovery. My question here would be, what would be the > triggering condition in your case (other than time)? > > What the community will add very soon is an atomic stop with savepoint > call which will take a savepoint of your job's state when and shut it down. > > Cheers, > Till > > On Thu, Jun 14, 2018 at 4:55 PM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Ashish, >> >> (I think) I understand your requirements and the approach of just keep >> non-failing tasks running is intuitively a good idea. >> However, this is only an option for use cases that are OK with >> at-least-once semantics (otherwise, we'd need to reset the state of the >> still running tasks and hence take checkpoints). >> Moreover, the distributed task coordination for keeping some tasks >> running, restarting others, and connecting them is obviously more difficult >> than "just" canceling the whole job and starting it again. >> >> I have to admit that I'm not that familiar with Flink's distributed task >> coordination. Till in CC knows much more about that. >> However, I think the question here is, how many use cases would benefit >> from a recovery mode with at-most-once state guarantees and how much >> implementation effort would it be to support it. >> >> Regarding the savepoints, if you are using the MemoryStateBackend failure >> at too large state size is expected since all state is replicated into the >> JobManager JVM. >> Did you try to use the FsStateBackend? It also holds the state on the >> TaskManager heap but backups it to a (distributed) filesystem. >> >> Best, Fabian >> >> 2018-06-14 4:18 GMT+02:00 Ashish Pokharel <ashish...@yahoo.com>: >> >>> Hi Fabian, >>> >>> Thanks for the prompt response and apologies for delayed response. >>> >>> You wrapped up the bottom lines pretty well - if I were to wrap it up >>> I’d say “best possible” recovery on “known" restarts either say manual >>> cancel + start OR framework initiated ones like on operator failures with >>> these constraints >>> - some data loss is ok >>> - avoid periodic checkpoints as states are really transient (less than >>> 5 seconds of lifetime if not milliseconds) and almost all events make it to >>> state. I do understand that checkpointing performance has drastically been >>> improved and with async and RocksDB options, it should technically not add >>> latency in application etc. However, I feel like even with improvements and >>> local checkpointing (which we already are doing) it is a lot of “unused” >>> IOPS/resource utilization especially if we start to spin up more apps >>> handling similar data sources and with similar requirements. On a first >>> blush it feels like those resources are better utilized in cluster for apps >>> with stricter SLAs for data loss and recovery etc instead. >>> >>> Basically, I suppose I am thinking Checkpointing feature that is >>> initialized by certain actions / events rather than periodic ones. Let me >>> know I am off-base here and I should just enable checkpointing in all of >>> these apps and move on :) >>> >>> I tried Savepoint again and it looks like the issue is caused by the >>> fact that Memory states are large as it is throwing error states are larger >>> than certain size. So solution of (1) will possibly solve (2) as well. >>> >>> Thanks again, >>> >>> Ashish >>> >>> >>> On Jun 7, 2018, at 4:25 PM, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>> Hi Ashish, >>> >>> Thanks for the great write up. >>> If I understood you correctly, there are two different issues that are >>> caused by the disabled checkpointing. >>> >>> 1) Recovery from a failure without restarting all operators to preserve >>> the state in the running tasks >>> 2) Planned restarts an application without losing all state (even with >>> disabled checkpointing). >>> >>> Ad 1) The community is constantly working on reducing the time for >>> checkpointing and recovery. >>> For 1.5, local task recovery was added, which basically stores a state >>> copy on the local disk which is read in case of a recovery. So, tasks are >>> restarted but don't read the to restore state from distributed storage but >>> from the local disk. >>> AFAIK, this can only be used together with remote checkpoints. I think >>> this might be an interesting option for you if it would be possible to >>> write checkpoints only to local disk and not remote storage. AFAIK, there >>> are also other efforts to reduce the number of restarted tasks in case of a >>> failure. I guess, you've played with other features such as >>> RocksDBStateBackend, incremental and async checkpoints already. >>> >>> Ad 2) It sounds as if savepoints are exactly the feature your are >>> looking for. It would be good to know what exactly did not work for you. >>> The MemoryStateBackend is not suitable for large state sizes because it >>> backups into the heap memory of the JobManager. >>> >>> Best, Fabian >>> >>> 2018-06-05 21:57 GMT+02:00 ashish pok <ashish...@yahoo.com>: >>> >>>> Fabian, Stephan, All, >>>> >>>> I started a discussion a while back around having a form of event-based >>>> checkpointing policy that will help us in some of our high volume data >>>> pipelines. Here is an effort to put this in front of community and >>>> understand what capabilities can support these type of use cases, how much >>>> others feel the same need and potentially a feature that can make it to a >>>> user story. >>>> >>>> *Use Case Summary:* >>>> - Extremely high volume of data (events from consumer devices with >>>> customer base of over 100M) >>>> - Multiple events need to be combined using a windowing streaming app >>>> grouped by keys (something like 5 min floor of timestamp and unique >>>> identifiers for customer devices) >>>> - "Most" events by a group/key arrive in few seconds if not >>>> milliseconds however events can sometimes delay or get lost in transport >>>> (so delayed event handling and timeouts will be needed) >>>> - Extremely low (pretty vague but hopefully details below clarify it >>>> more) data loss is acceptable >>>> - Because of the volume and transient nature of source, checkpointing >>>> is turned off (saves on writes to persistence as states/sessions are active >>>> for only few seconds during processing) >>>> >>>> *Problem Summary:* >>>> Of course, none of the above is out of the norm for Flink and as a >>>> matter of factor we already have a Flink app doing this. The issue arises >>>> when it comes to graceful shutdowns and on operator failures (eg: Kafka >>>> timeouts etc.) On operator failures, entire job graph restarts which >>>> essentially flushes out in-memory states/sessions. I think there is a >>>> feature in works (not sure if it made it to 1.5) to perform selective >>>> restarts which will control the damage but still will result in data loss. >>>> Also, it doesn't help when application restarts are needed. We did try >>>> going savepoint route for explicit restart needs but I think >>>> MemoryBackedState ran into issues for larger states or something along >>>> those line(not certain). We obviously cannot recover an operator that >>>> actually fails because it's own state could be unrecoverable. However, it >>>> feels like Flink already has a lot of plumbing to help with overall problem >>>> of allowing some sort of recoverable state to handle graceful shutdowns and >>>> restarts with minimal data loss. >>>> >>>> *Solutions:* >>>> Some in community commented on my last email with decent ideas like >>>> having an event-based checkpointing trigger (on shutdown, on restart etc) >>>> or life-cycle hooks (onCancel, onRestart etc) in Functions that can be >>>> implemented if this type of behavior is needed etc. >>>> >>>> Appreciate feedback from community on how useful this might be for >>>> others and from core contributors on their thoughts as well. >>>> >>>> Thanks in advance, Ashish >>>> >>>> >>> >>> >> >