Thanks for the feedback Ashish,

I think the closest solution to your problem could be to enable local
recovery [1] and somehow avoid writing remote checkpoints.
In that case, checkpoint data would be written to the local disk but not to
a remote filesystem. In case of a recovery, all tasks are restarted and
load their local state.
AFAIK, it is not possible to disable remote checkpoints yet, but I think
this the closest feature to what you are trying to do.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/large_state_tuning.html#task-local-recovery

2018-07-22 18:56 GMT+02:00 Ashish Pokharel <ashish...@yahoo.com>:

> Till, Fabian,
>
> Looping back after a gap on this, for some reason this looks like a need
> very specific to us (I would have thought this would be of interest to
> others as well). We on-boarded one of our new IoT data sources and our
> total checkpoints right now are over 1TB and checkpoint period is 5 seconds
> - those are with delta states enabled (I explained how transient the states
> are previously). I sincerely don’t see any need of this, especially given
> tolerances we have for little loss/dups and also the fact that we are going
> to on-board a few data sources at this scale. We switched over to local
> SSDs on our cluster just to isolate this use case from destroying our HDFS
> :)
>
> Of source it is easier said than done, but event based Checkpoint (eg: we
> are able to checkpoint when RESTART happens etc) as discussed below would
> be great.
>
> Thanks, Ashish
>
>
> On Jun 15, 2018, at 10:28 PM, Ashish Pokharel <ashish...@yahoo.com> wrote:
>
> Hi Till, Fabian,
>
> Thanks for your responses again.
>
> Till, you have nailed it. I will comment on them individually. But first,
> I feel like I am still not stating it well enough to illustrate the need.
> May be I am overthinking :)
>
> So let me try one more time with a preface that we are talking about
> millions of sensors reporting logs/metrics. So in my cluster we can
> potentially have 10s if not 100s of such apps for variety of data. I
> currently have 1 app in Prod so I can do a lot testing :) Just as a test, I
> enabled RocksDB State Backend and Checkpointing every 5 seconds with
> Graphite metrics enabled. On an average I could see almost 25GB of total
> state being written across couple of hundred slots based on Graphite
> numbers - it is setup with incremental and async Checkpoints. I am assuming
> main reason being states are transient and deltas are essentially entire
> set of new states. Our main concern is real-time processing vs no data loss
> or even possibly a few duplicates. To Fabian’s point, at least once vs
> exactly once semantics are also not of utmost concern at this point. Now,
> bottom line is I have Checkpointing disabled and use MemoryStateBackend
> with the thought that writing massive states to persistence every few
> seconds didn’t seem like best use of resources - I’d rather fit in more of
> these apps in cluster and use stateful processing for apps we really need
> them on. However, this leads to 2 main issue
>
> 1- If an operator fails (let’s say Kafka timeouts), entire job graph
> restarts which leads us to more than desirable gap of data (lost states
> across 100s of operators) as obviously there is no recoverable state
> 2- Same issue happens in planned restarts
>
> Per Fabian’s suggestion, I am going to try RocksDB State Backend with
> local drives and run some baseline tests - hoping states are kept in memory
> for the most part unless spillover is needed. This should at least allow us
> with decent solution of (2). I am still not convinced we should enable
> periodic Checkpointing (perhaps I am wrong here but again I have
> highlighted my reasons above).
>
> "
>
> Just for my own understanding: What do you want to do with event-based
> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a
> partial snapshot which should then be used for recovery? Technically, this
> is possible but I'm not sure how much value this would add for Flink users.
> A partial snapshot could also be completely empty (equivalent of disabling
> checkpointing).
>
>
> I can see the point of making the checkpoint triggering more flexible and
> giving some control to the user. In contrast to savepoints, checkpoints are
> considered for recovery. My question here would be, what would be the
> triggering condition in your case (other than time)?
>
> "
> I’d think trigger condition would be based on life-cycle hook like RESTART
> (or perhaps even an external message when FLINK-6131 is available may be).
> Partial (best possible) snapshot is exactly what it would be - states from
> failing operators cannot be expected to be recoverable obviously.
>
> What the community will add very soon is an atomic stop with savepoint
> call which will take a savepoint of your job's state when and shut it down.
>
>
> Very nice! Would this also have same need to use Fs or RocksDB State
> Backend? It shouldn’t be an issue for us as long as my tests above turn out
> to be decent.
>
> Thanks again guys for your advice and feedback. Really appreciate it.
>
> — Ashish
>
>
>
> On Jun 15, 2018, at 5:43 AM, Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi,
>
> ideally we would not have to cancel all tasks and only redeploy the whole
> job in case of a restart. Instead we should do what you've outlined:
> Redeploy the failed tasks and reset the state of all other running tasks.
> At the moment, this is, however, not yet possible. While improving Flink's
> recovery behavior this should be addressed eventually.
>
> Just for my own understanding: What do you want to do with event-based
> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a
> partial snapshot which should then be used for recovery? Technically, this
> is possible but I'm not sure how much value this would add for Flink users.
> A partial snapshot could also be completely empty (equivalent of disabling
> checkpointing).
>
> I can see the point of making the checkpoint triggering more flexible and
> giving some control to the user. In contrast to savepoints, checkpoints are
> considered for recovery. My question here would be, what would be the
> triggering condition in your case (other than time)?
>
> What the community will add very soon is an atomic stop with savepoint
> call which will take a savepoint of your job's state when and shut it down.
>
> Cheers,
> Till
>
> On Thu, Jun 14, 2018 at 4:55 PM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Ashish,
>>
>> (I think) I understand your requirements and the approach of just keep
>> non-failing tasks running is intuitively a good idea.
>> However, this is only an option for use cases that are OK with
>> at-least-once semantics (otherwise, we'd need to reset the state of the
>> still running tasks and hence take checkpoints).
>> Moreover, the distributed task coordination for keeping some tasks
>> running, restarting others, and connecting them is obviously more difficult
>> than "just" canceling the whole job and starting it again.
>>
>> I have to admit that I'm not that familiar with Flink's distributed task
>> coordination. Till in CC knows much more about that.
>> However, I think the question here is, how many use cases would benefit
>> from a recovery mode with at-most-once state guarantees and how much
>> implementation effort would it be to support it.
>>
>> Regarding the savepoints, if you are using the MemoryStateBackend failure
>> at too large state size is expected since all state is replicated into the
>> JobManager JVM.
>> Did you try to use the FsStateBackend? It also holds the state on the
>> TaskManager heap but backups it to a (distributed) filesystem.
>>
>> Best, Fabian
>>
>> 2018-06-14 4:18 GMT+02:00 Ashish Pokharel <ashish...@yahoo.com>:
>>
>>> Hi Fabian,
>>>
>>> Thanks for the prompt response and apologies for delayed response.
>>>
>>> You wrapped up the bottom lines pretty well - if I were to wrap it up
>>> I’d say “best possible” recovery on “known" restarts either say manual
>>> cancel + start OR framework initiated ones like on operator failures with
>>> these constraints
>>>  - some data loss is ok
>>>  - avoid periodic checkpoints as states are really transient (less than
>>> 5 seconds of lifetime if not milliseconds) and almost all events make it to
>>> state. I do understand that checkpointing performance has drastically been
>>> improved and with async and RocksDB options, it should technically not add
>>> latency in application etc. However, I feel like even with improvements and
>>> local checkpointing (which we already are doing) it is a lot of “unused”
>>> IOPS/resource utilization especially if we start to spin up more apps
>>> handling similar data sources and with similar requirements. On a first
>>> blush it feels like those resources are better utilized in cluster for apps
>>> with stricter SLAs for data loss and recovery etc instead.
>>>
>>> Basically, I suppose I am thinking Checkpointing feature that is
>>> initialized by certain actions / events rather than periodic ones. Let me
>>> know I am off-base here and I should just enable checkpointing in all of
>>> these apps and move on :)
>>>
>>> I tried Savepoint again and it looks like the issue is caused by the
>>> fact that Memory states are large as it is throwing error states are larger
>>> than certain size. So solution of (1) will possibly solve (2) as well.
>>>
>>> Thanks again,
>>>
>>> Ashish
>>>
>>>
>>> On Jun 7, 2018, at 4:25 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>> Hi Ashish,
>>>
>>> Thanks for the great write up.
>>> If I understood you correctly, there are two different issues that are
>>> caused by the disabled checkpointing.
>>>
>>> 1) Recovery from a failure without restarting all operators to preserve
>>> the state in the running tasks
>>> 2) Planned restarts an application without losing all state (even with
>>> disabled checkpointing).
>>>
>>> Ad 1) The community is constantly working on reducing the time for
>>> checkpointing and recovery.
>>> For 1.5, local task recovery was added, which basically stores a state
>>> copy on the local disk which is read in case of a recovery. So, tasks are
>>> restarted but don't read the to restore state from distributed storage but
>>> from the local disk.
>>> AFAIK, this can only be used together with remote checkpoints. I think
>>> this might be an interesting option for you if it would be possible to
>>> write checkpoints only to local disk and not remote storage. AFAIK, there
>>> are also other efforts to reduce the number of restarted tasks in case of a
>>> failure. I guess, you've played with other features such as
>>> RocksDBStateBackend, incremental and async checkpoints already.
>>>
>>> Ad 2) It sounds as if savepoints are exactly the feature your are
>>> looking for. It would be good to know what exactly did not work for you.
>>> The MemoryStateBackend is not suitable for large state sizes because it
>>> backups into the heap memory of the JobManager.
>>>
>>> Best, Fabian
>>>
>>> 2018-06-05 21:57 GMT+02:00 ashish pok <ashish...@yahoo.com>:
>>>
>>>> Fabian, Stephan, All,
>>>>
>>>> I started a discussion a while back around having a form of event-based
>>>> checkpointing policy that will help us in some of our high volume data
>>>> pipelines. Here is an effort to put this in front of community and
>>>> understand what capabilities can support these type of use cases, how much
>>>> others feel the same need and potentially a feature that can make it to a
>>>> user story.
>>>>
>>>> *Use Case Summary:*
>>>> - Extremely high volume of data (events from consumer devices with
>>>> customer base of over 100M)
>>>> - Multiple events need to be combined using a windowing streaming app
>>>> grouped by keys (something like 5 min floor of timestamp and unique
>>>> identifiers for customer devices)
>>>> - "Most" events by a group/key arrive in few seconds if not
>>>> milliseconds however events can sometimes delay or get lost in transport
>>>> (so delayed event handling and timeouts will be needed)
>>>> - Extremely low (pretty vague but hopefully details below clarify it
>>>> more) data loss is acceptable
>>>> - Because of the volume and transient nature of source, checkpointing
>>>> is turned off (saves on writes to persistence as states/sessions are active
>>>> for only few seconds during processing)
>>>>
>>>> *Problem Summary:*
>>>> Of course, none of the above is out of the norm for Flink and as a
>>>> matter of factor we already have a Flink app doing this. The issue arises
>>>> when it comes to graceful shutdowns and on operator failures (eg: Kafka
>>>> timeouts etc.) On operator failures, entire job graph restarts which
>>>> essentially flushes out in-memory states/sessions. I think there is a
>>>> feature in works (not sure if it made it to 1.5) to perform selective
>>>> restarts which will control the damage but still will result in data loss.
>>>> Also, it doesn't help when application restarts are needed. We did try
>>>> going savepoint route for explicit restart needs but I think
>>>> MemoryBackedState ran into issues for larger states or something along
>>>> those line(not certain). We obviously cannot recover an operator that
>>>> actually fails because it's own state could be unrecoverable. However, it
>>>> feels like Flink already has a lot of plumbing to help with overall problem
>>>> of allowing some sort of recoverable state to handle graceful shutdowns and
>>>> restarts with minimal data loss.
>>>>
>>>> *Solutions:*
>>>> Some in community commented on my last email with decent ideas like
>>>> having an event-based checkpointing trigger (on shutdown, on restart etc)
>>>> or life-cycle hooks (onCancel, onRestart etc) in Functions that can be
>>>> implemented if this type of behavior is needed etc.
>>>>
>>>> Appreciate feedback from community on how useful this might be for
>>>> others and from core contributors on their thoughts as well.
>>>>
>>>> Thanks in advance, Ashish
>>>>
>>>>
>>>
>>>
>>
>
>

Reply via email to