Re: IoT Use Case, Problem and Thoughts

2018-07-23 Thread Fabian Hueske
Thanks for the feedback Ashish,

I think the closest solution to your problem could be to enable local
recovery [1] and somehow avoid writing remote checkpoints.
In that case, checkpoint data would be written to the local disk but not to
a remote filesystem. In case of a recovery, all tasks are restarted and
load their local state.
AFAIK, it is not possible to disable remote checkpoints yet, but I think
this the closest feature to what you are trying to do.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/large_state_tuning.html#task-local-recovery

2018-07-22 18:56 GMT+02:00 Ashish Pokharel :

> Till, Fabian,
>
> Looping back after a gap on this, for some reason this looks like a need
> very specific to us (I would have thought this would be of interest to
> others as well). We on-boarded one of our new IoT data sources and our
> total checkpoints right now are over 1TB and checkpoint period is 5 seconds
> - those are with delta states enabled (I explained how transient the states
> are previously). I sincerely don’t see any need of this, especially given
> tolerances we have for little loss/dups and also the fact that we are going
> to on-board a few data sources at this scale. We switched over to local
> SSDs on our cluster just to isolate this use case from destroying our HDFS
> :)
>
> Of source it is easier said than done, but event based Checkpoint (eg: we
> are able to checkpoint when RESTART happens etc) as discussed below would
> be great.
>
> Thanks, Ashish
>
>
> On Jun 15, 2018, at 10:28 PM, Ashish Pokharel  wrote:
>
> Hi Till, Fabian,
>
> Thanks for your responses again.
>
> Till, you have nailed it. I will comment on them individually. But first,
> I feel like I am still not stating it well enough to illustrate the need.
> May be I am overthinking :)
>
> So let me try one more time with a preface that we are talking about
> millions of sensors reporting logs/metrics. So in my cluster we can
> potentially have 10s if not 100s of such apps for variety of data. I
> currently have 1 app in Prod so I can do a lot testing :) Just as a test, I
> enabled RocksDB State Backend and Checkpointing every 5 seconds with
> Graphite metrics enabled. On an average I could see almost 25GB of total
> state being written across couple of hundred slots based on Graphite
> numbers - it is setup with incremental and async Checkpoints. I am assuming
> main reason being states are transient and deltas are essentially entire
> set of new states. Our main concern is real-time processing vs no data loss
> or even possibly a few duplicates. To Fabian’s point, at least once vs
> exactly once semantics are also not of utmost concern at this point. Now,
> bottom line is I have Checkpointing disabled and use MemoryStateBackend
> with the thought that writing massive states to persistence every few
> seconds didn’t seem like best use of resources - I’d rather fit in more of
> these apps in cluster and use stateful processing for apps we really need
> them on. However, this leads to 2 main issue
>
> 1- If an operator fails (let’s say Kafka timeouts), entire job graph
> restarts which leads us to more than desirable gap of data (lost states
> across 100s of operators) as obviously there is no recoverable state
> 2- Same issue happens in planned restarts
>
> Per Fabian’s suggestion, I am going to try RocksDB State Backend with
> local drives and run some baseline tests - hoping states are kept in memory
> for the most part unless spillover is needed. This should at least allow us
> with decent solution of (2). I am still not convinced we should enable
> periodic Checkpointing (perhaps I am wrong here but again I have
> highlighted my reasons above).
>
> "
>
> Just for my own understanding: What do you want to do with event-based
> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a
> partial snapshot which should then be used for recovery? Technically, this
> is possible but I'm not sure how much value this would add for Flink users.
> A partial snapshot could also be completely empty (equivalent of disabling
> checkpointing).
>
>
> I can see the point of making the checkpoint triggering more flexible and
> giving some control to the user. In contrast to savepoints, checkpoints are
> considered for recovery. My question here would be, what would be the
> triggering condition in your case (other than time)?
>
> "
> I’d think trigger condition would be based on life-cycle hook like RESTART
> (or perhaps even an external message when FLINK-6131 is available may be).
> Partial (best possible) snapshot is exactly what it would be - states from
> failing operators cannot be expected to be recoverable obviously.
>
> What the community will add very soon is an atomic stop with savepoint
> call which will take a savepoint of your job's state when and shut it down.
>
>
> Very nice! Would this also have same need to use Fs or RocksDB State
> Backend? It shouldn’t be an issue for us 

Re: IoT Use Case, Problem and Thoughts

2018-07-22 Thread Ashish Pokharel
Till, Fabian,

Looping back after a gap on this, for some reason this looks like a need very 
specific to us (I would have thought this would be of interest to others as 
well). We on-boarded one of our new IoT data sources and our total checkpoints 
right now are over 1TB and checkpoint period is 5 seconds - those are with 
delta states enabled (I explained how transient the states are previously). I 
sincerely don’t see any need of this, especially given tolerances we have for 
little loss/dups and also the fact that we are going to on-board a few data 
sources at this scale. We switched over to local SSDs on our cluster just to 
isolate this use case from destroying our HDFS :)

Of source it is easier said than done, but event based Checkpoint (eg: we are 
able to checkpoint when RESTART happens etc) as discussed below would be great. 

Thanks, Ashish


> On Jun 15, 2018, at 10:28 PM, Ashish Pokharel  wrote:
> 
> Hi Till, Fabian,
> 
> Thanks for your responses again. 
> 
> Till, you have nailed it. I will comment on them individually. But first, I 
> feel like I am still not stating it well enough to illustrate the need. May 
> be I am overthinking :)
> 
> So let me try one more time with a preface that we are talking about millions 
> of sensors reporting logs/metrics. So in my cluster we can potentially have 
> 10s if not 100s of such apps for variety of data. I currently have 1 app in 
> Prod so I can do a lot testing :) Just as a test, I enabled RocksDB State 
> Backend and Checkpointing every 5 seconds with Graphite metrics enabled. On 
> an average I could see almost 25GB of total state being written across couple 
> of hundred slots based on Graphite numbers - it is setup with incremental and 
> async Checkpoints. I am assuming main reason being states are transient and 
> deltas are essentially entire set of new states. Our main concern is 
> real-time processing vs no data loss or even possibly a few duplicates. To 
> Fabian’s point, at least once vs exactly once semantics are also not of 
> utmost concern at this point. Now, bottom line is I have Checkpointing 
> disabled and use MemoryStateBackend with the thought that writing massive 
> states to persistence every few seconds didn’t seem like best use of 
> resources - I’d rather fit in more of these apps in cluster and use stateful 
> processing for apps we really need them on. However, this leads to 2 main 
> issue
> 
> 1- If an operator fails (let’s say Kafka timeouts), entire job graph restarts 
> which leads us to more than desirable gap of data (lost states across 100s of 
> operators) as obviously there is no recoverable state
> 2- Same issue happens in planned restarts
> 
> Per Fabian’s suggestion, I am going to try RocksDB State Backend with local 
> drives and run some baseline tests - hoping states are kept in memory for the 
> most part unless spillover is needed. This should at least allow us with 
> decent solution of (2). I am still not convinced we should enable periodic 
> Checkpointing (perhaps I am wrong here but again I have highlighted my 
> reasons above).
> 
> "
>> Just for my own understanding: What do you want to do with event-based 
>> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a 
>> partial snapshot which should then be used for recovery? Technically, this 
>> is possible but I'm not sure how much value this would add for Flink users. 
>> A partial snapshot could also be completely empty (equivalent of disabling 
>> checkpointing).
> 
>> I can see the point of making the checkpoint triggering more flexible and 
>> giving some control to the user. In contrast to savepoints, checkpoints are 
>> considered for recovery. My question here would be, what would be the 
>> triggering condition in your case (other than time)?
> "
> I’d think trigger condition would be based on life-cycle hook like RESTART 
> (or perhaps even an external message when FLINK-6131 is available may be). 
> Partial (best possible) snapshot is exactly what it would be - states from 
> failing operators cannot be expected to be recoverable obviously.
> 
>> What the community will add very soon is an atomic stop with savepoint call 
>> which will take a savepoint of your job's state when and shut it down.
> 
> Very nice! Would this also have same need to use Fs or RocksDB State Backend? 
> It shouldn’t be an issue for us as long as my tests above turn out to be 
> decent. 
> 
> Thanks again guys for your advice and feedback. Really appreciate it.
> 
> — Ashish
> 
>  
>> On Jun 15, 2018, at 5:43 AM, Till Rohrmann > > wrote:
>> 
>> Hi,
>> 
>> ideally we would not have to cancel all tasks and only redeploy the whole 
>> job in case of a restart. Instead we should do what you've outlined: 
>> Redeploy the failed tasks and reset the state of all other running tasks. At 
>> the moment, this is, however, not yet possible. While improving Flink's 
>> recovery behavior this should be addressed 

Re: IoT Use Case, Problem and Thoughts

2018-06-18 Thread Till Rohrmann
Hi Ashish,

the atomic savepoint with savepoint is going to be implemented for all
state backends.

Cheers,
Till

On Sat, Jun 16, 2018 at 4:29 AM Ashish Pokharel  wrote:

> Hi Till, Fabian,
>
> Thanks for your responses again.
>
> Till, you have nailed it. I will comment on them individually. But first,
> I feel like I am still not stating it well enough to illustrate the need.
> May be I am overthinking :)
>
> So let me try one more time with a preface that we are talking about
> millions of sensors reporting logs/metrics. So in my cluster we can
> potentially have 10s if not 100s of such apps for variety of data. I
> currently have 1 app in Prod so I can do a lot testing :) Just as a test, I
> enabled RocksDB State Backend and Checkpointing every 5 seconds with
> Graphite metrics enabled. On an average I could see almost 25GB of total
> state being written across couple of hundred slots based on Graphite
> numbers - it is setup with incremental and async Checkpoints. I am assuming
> main reason being states are transient and deltas are essentially entire
> set of new states. Our main concern is real-time processing vs no data loss
> or even possibly a few duplicates. To Fabian’s point, at least once vs
> exactly once semantics are also not of utmost concern at this point. Now,
> bottom line is I have Checkpointing disabled and use MemoryStateBackend
> with the thought that writing massive states to persistence every few
> seconds didn’t seem like best use of resources - I’d rather fit in more of
> these apps in cluster and use stateful processing for apps we really need
> them on. However, this leads to 2 main issue
>
> 1- If an operator fails (let’s say Kafka timeouts), entire job graph
> restarts which leads us to more than desirable gap of data (lost states
> across 100s of operators) as obviously there is no recoverable state
> 2- Same issue happens in planned restarts
>
> Per Fabian’s suggestion, I am going to try RocksDB State Backend with
> local drives and run some baseline tests - hoping states are kept in memory
> for the most part unless spillover is needed. This should at least allow us
> with decent solution of (2). I am still not convinced we should enable
> periodic Checkpointing (perhaps I am wrong here but again I have
> highlighted my reasons above).
>
> "
>
> Just for my own understanding: What do you want to do with event-based
> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a
> partial snapshot which should then be used for recovery? Technically, this
> is possible but I'm not sure how much value this would add for Flink users.
> A partial snapshot could also be completely empty (equivalent of disabling
> checkpointing).
>
>
> I can see the point of making the checkpoint triggering more flexible and
> giving some control to the user. In contrast to savepoints, checkpoints are
> considered for recovery. My question here would be, what would be the
> triggering condition in your case (other than time)?
>
> "
> I’d think trigger condition would be based on life-cycle hook like RESTART
> (or perhaps even an external message when FLINK-6131 is available may be).
> Partial (best possible) snapshot is exactly what it would be - states from
> failing operators cannot be expected to be recoverable obviously.
>
> What the community will add very soon is an atomic stop with savepoint
> call which will take a savepoint of your job's state when and shut it down.
>
>
> Very nice! Would this also have same need to use Fs or RocksDB State
> Backend? It shouldn’t be an issue for us as long as my tests above turn out
> to be decent.
>
> Thanks again guys for your advice and feedback. Really appreciate it.
>
> — Ashish
>
>
>
> On Jun 15, 2018, at 5:43 AM, Till Rohrmann  wrote:
>
> Hi,
>
> ideally we would not have to cancel all tasks and only redeploy the whole
> job in case of a restart. Instead we should do what you've outlined:
> Redeploy the failed tasks and reset the state of all other running tasks.
> At the moment, this is, however, not yet possible. While improving Flink's
> recovery behavior this should be addressed eventually.
>
> Just for my own understanding: What do you want to do with event-based
> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a
> partial snapshot which should then be used for recovery? Technically, this
> is possible but I'm not sure how much value this would add for Flink users.
> A partial snapshot could also be completely empty (equivalent of disabling
> checkpointing).
>
> I can see the point of making the checkpoint triggering more flexible and
> giving some control to the user. In contrast to savepoints, checkpoints are
> considered for recovery. My question here would be, what would be the
> triggering condition in your case (other than time)?
>
> What the community will add very soon is an atomic stop with savepoint
> call which will take a savepoint of your job's state when and shut it down.
>
> Cheers,
> Till
>
> 

Re: IoT Use Case, Problem and Thoughts

2018-06-15 Thread Ashish Pokharel
Hi Till, Fabian,

Thanks for your responses again. 

Till, you have nailed it. I will comment on them individually. But first, I 
feel like I am still not stating it well enough to illustrate the need. May be 
I am overthinking :)

So let me try one more time with a preface that we are talking about millions 
of sensors reporting logs/metrics. So in my cluster we can potentially have 10s 
if not 100s of such apps for variety of data. I currently have 1 app in Prod so 
I can do a lot testing :) Just as a test, I enabled RocksDB State Backend and 
Checkpointing every 5 seconds with Graphite metrics enabled. On an average I 
could see almost 25GB of total state being written across couple of hundred 
slots based on Graphite numbers - it is setup with incremental and async 
Checkpoints. I am assuming main reason being states are transient and deltas 
are essentially entire set of new states. Our main concern is real-time 
processing vs no data loss or even possibly a few duplicates. To Fabian’s 
point, at least once vs exactly once semantics are also not of utmost concern 
at this point. Now, bottom line is I have Checkpointing disabled and use 
MemoryStateBackend with the thought that writing massive states to persistence 
every few seconds didn’t seem like best use of resources - I’d rather fit in 
more of these apps in cluster and use stateful processing for apps we really 
need them on. However, this leads to 2 main issue

1- If an operator fails (let’s say Kafka timeouts), entire job graph restarts 
which leads us to more than desirable gap of data (lost states across 100s of 
operators) as obviously there is no recoverable state
2- Same issue happens in planned restarts

Per Fabian’s suggestion, I am going to try RocksDB State Backend with local 
drives and run some baseline tests - hoping states are kept in memory for the 
most part unless spillover is needed. This should at least allow us with decent 
solution of (2). I am still not convinced we should enable periodic 
Checkpointing (perhaps I am wrong here but again I have highlighted my reasons 
above).

"
> Just for my own understanding: What do you want to do with event-based 
> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a 
> partial snapshot which should then be used for recovery? Technically, this is 
> possible but I'm not sure how much value this would add for Flink users. A 
> partial snapshot could also be completely empty (equivalent of disabling 
> checkpointing).

> I can see the point of making the checkpoint triggering more flexible and 
> giving some control to the user. In contrast to savepoints, checkpoints are 
> considered for recovery. My question here would be, what would be the 
> triggering condition in your case (other than time)?
"
I’d think trigger condition would be based on life-cycle hook like RESTART (or 
perhaps even an external message when FLINK-6131 is available may be). Partial 
(best possible) snapshot is exactly what it would be - states from failing 
operators cannot be expected to be recoverable obviously.

> What the community will add very soon is an atomic stop with savepoint call 
> which will take a savepoint of your job's state when and shut it down.

Very nice! Would this also have same need to use Fs or RocksDB State Backend? 
It shouldn’t be an issue for us as long as my tests above turn out to be 
decent. 

Thanks again guys for your advice and feedback. Really appreciate it.

— Ashish

 
> On Jun 15, 2018, at 5:43 AM, Till Rohrmann  wrote:
> 
> Hi,
> 
> ideally we would not have to cancel all tasks and only redeploy the whole job 
> in case of a restart. Instead we should do what you've outlined: Redeploy the 
> failed tasks and reset the state of all other running tasks. At the moment, 
> this is, however, not yet possible. While improving Flink's recovery behavior 
> this should be addressed eventually.
> 
> Just for my own understanding: What do you want to do with event-based 
> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a 
> partial snapshot which should then be used for recovery? Technically, this is 
> possible but I'm not sure how much value this would add for Flink users. A 
> partial snapshot could also be completely empty (equivalent of disabling 
> checkpointing).
> 
> I can see the point of making the checkpoint triggering more flexible and 
> giving some control to the user. In contrast to savepoints, checkpoints are 
> considered for recovery. My question here would be, what would be the 
> triggering condition in your case (other than time)?
> 
> What the community will add very soon is an atomic stop with savepoint call 
> which will take a savepoint of your job's state when and shut it down.
> 
> Cheers,
> Till
> 
> On Thu, Jun 14, 2018 at 4:55 PM Fabian Hueske  > wrote:
> Hi Ashish,
> 
> (I think) I understand your requirements and the approach of just keep 
> non-failing tasks running is intuitively a 

Re: IoT Use Case, Problem and Thoughts

2018-06-15 Thread Till Rohrmann
Hi,

ideally we would not have to cancel all tasks and only redeploy the whole
job in case of a restart. Instead we should do what you've outlined:
Redeploy the failed tasks and reset the state of all other running tasks.
At the moment, this is, however, not yet possible. While improving Flink's
recovery behavior this should be addressed eventually.

Just for my own understanding: What do you want to do with event-based
checkpointing triggers (onRestart, onShutdown?). Do you want to draw a
partial snapshot which should then be used for recovery? Technically, this
is possible but I'm not sure how much value this would add for Flink users.
A partial snapshot could also be completely empty (equivalent of disabling
checkpointing).

I can see the point of making the checkpoint triggering more flexible and
giving some control to the user. In contrast to savepoints, checkpoints are
considered for recovery. My question here would be, what would be the
triggering condition in your case (other than time)?

What the community will add very soon is an atomic stop with savepoint call
which will take a savepoint of your job's state when and shut it down.

Cheers,
Till

On Thu, Jun 14, 2018 at 4:55 PM Fabian Hueske  wrote:

> Hi Ashish,
>
> (I think) I understand your requirements and the approach of just keep
> non-failing tasks running is intuitively a good idea.
> However, this is only an option for use cases that are OK with
> at-least-once semantics (otherwise, we'd need to reset the state of the
> still running tasks and hence take checkpoints).
> Moreover, the distributed task coordination for keeping some tasks
> running, restarting others, and connecting them is obviously more difficult
> than "just" canceling the whole job and starting it again.
>
> I have to admit that I'm not that familiar with Flink's distributed task
> coordination. Till in CC knows much more about that.
> However, I think the question here is, how many use cases would benefit
> from a recovery mode with at-most-once state guarantees and how much
> implementation effort would it be to support it.
>
> Regarding the savepoints, if you are using the MemoryStateBackend failure
> at too large state size is expected since all state is replicated into the
> JobManager JVM.
> Did you try to use the FsStateBackend? It also holds the state on the
> TaskManager heap but backups it to a (distributed) filesystem.
>
> Best, Fabian
>
> 2018-06-14 4:18 GMT+02:00 Ashish Pokharel :
>
>> Hi Fabian,
>>
>> Thanks for the prompt response and apologies for delayed response.
>>
>> You wrapped up the bottom lines pretty well - if I were to wrap it up I’d
>> say “best possible” recovery on “known" restarts either say manual cancel +
>> start OR framework initiated ones like on operator failures with these
>> constraints
>>  - some data loss is ok
>>  - avoid periodic checkpoints as states are really transient (less than 5
>> seconds of lifetime if not milliseconds) and almost all events make it to
>> state. I do understand that checkpointing performance has drastically been
>> improved and with async and RocksDB options, it should technically not add
>> latency in application etc. However, I feel like even with improvements and
>> local checkpointing (which we already are doing) it is a lot of “unused”
>> IOPS/resource utilization especially if we start to spin up more apps
>> handling similar data sources and with similar requirements. On a first
>> blush it feels like those resources are better utilized in cluster for apps
>> with stricter SLAs for data loss and recovery etc instead.
>>
>> Basically, I suppose I am thinking Checkpointing feature that is
>> initialized by certain actions / events rather than periodic ones. Let me
>> know I am off-base here and I should just enable checkpointing in all of
>> these apps and move on :)
>>
>> I tried Savepoint again and it looks like the issue is caused by the fact
>> that Memory states are large as it is throwing error states are larger than
>> certain size. So solution of (1) will possibly solve (2) as well.
>>
>> Thanks again,
>>
>> Ashish
>>
>>
>> On Jun 7, 2018, at 4:25 PM, Fabian Hueske  wrote:
>>
>> Hi Ashish,
>>
>> Thanks for the great write up.
>> If I understood you correctly, there are two different issues that are
>> caused by the disabled checkpointing.
>>
>> 1) Recovery from a failure without restarting all operators to preserve
>> the state in the running tasks
>> 2) Planned restarts an application without losing all state (even with
>> disabled checkpointing).
>>
>> Ad 1) The community is constantly working on reducing the time for
>> checkpointing and recovery.
>> For 1.5, local task recovery was added, which basically stores a state
>> copy on the local disk which is read in case of a recovery. So, tasks are
>> restarted but don't read the to restore state from distributed storage but
>> from the local disk.
>> AFAIK, this can only be used together with remote checkpoints. I think
>> this might be an 

Re: IoT Use Case, Problem and Thoughts

2018-06-14 Thread Fabian Hueske
Hi Ashish,

(I think) I understand your requirements and the approach of just keep
non-failing tasks running is intuitively a good idea.
However, this is only an option for use cases that are OK with
at-least-once semantics (otherwise, we'd need to reset the state of the
still running tasks and hence take checkpoints).
Moreover, the distributed task coordination for keeping some tasks running,
restarting others, and connecting them is obviously more difficult than
"just" canceling the whole job and starting it again.

I have to admit that I'm not that familiar with Flink's distributed task
coordination. Till in CC knows much more about that.
However, I think the question here is, how many use cases would benefit
from a recovery mode with at-most-once state guarantees and how much
implementation effort would it be to support it.

Regarding the savepoints, if you are using the MemoryStateBackend failure
at too large state size is expected since all state is replicated into the
JobManager JVM.
Did you try to use the FsStateBackend? It also holds the state on the
TaskManager heap but backups it to a (distributed) filesystem.

Best, Fabian

2018-06-14 4:18 GMT+02:00 Ashish Pokharel :

> Hi Fabian,
>
> Thanks for the prompt response and apologies for delayed response.
>
> You wrapped up the bottom lines pretty well - if I were to wrap it up I’d
> say “best possible” recovery on “known" restarts either say manual cancel +
> start OR framework initiated ones like on operator failures with these
> constraints
>  - some data loss is ok
>  - avoid periodic checkpoints as states are really transient (less than 5
> seconds of lifetime if not milliseconds) and almost all events make it to
> state. I do understand that checkpointing performance has drastically been
> improved and with async and RocksDB options, it should technically not add
> latency in application etc. However, I feel like even with improvements and
> local checkpointing (which we already are doing) it is a lot of “unused”
> IOPS/resource utilization especially if we start to spin up more apps
> handling similar data sources and with similar requirements. On a first
> blush it feels like those resources are better utilized in cluster for apps
> with stricter SLAs for data loss and recovery etc instead.
>
> Basically, I suppose I am thinking Checkpointing feature that is
> initialized by certain actions / events rather than periodic ones. Let me
> know I am off-base here and I should just enable checkpointing in all of
> these apps and move on :)
>
> I tried Savepoint again and it looks like the issue is caused by the fact
> that Memory states are large as it is throwing error states are larger than
> certain size. So solution of (1) will possibly solve (2) as well.
>
> Thanks again,
>
> Ashish
>
>
> On Jun 7, 2018, at 4:25 PM, Fabian Hueske  wrote:
>
> Hi Ashish,
>
> Thanks for the great write up.
> If I understood you correctly, there are two different issues that are
> caused by the disabled checkpointing.
>
> 1) Recovery from a failure without restarting all operators to preserve
> the state in the running tasks
> 2) Planned restarts an application without losing all state (even with
> disabled checkpointing).
>
> Ad 1) The community is constantly working on reducing the time for
> checkpointing and recovery.
> For 1.5, local task recovery was added, which basically stores a state
> copy on the local disk which is read in case of a recovery. So, tasks are
> restarted but don't read the to restore state from distributed storage but
> from the local disk.
> AFAIK, this can only be used together with remote checkpoints. I think
> this might be an interesting option for you if it would be possible to
> write checkpoints only to local disk and not remote storage. AFAIK, there
> are also other efforts to reduce the number of restarted tasks in case of a
> failure. I guess, you've played with other features such as
> RocksDBStateBackend, incremental and async checkpoints already.
>
> Ad 2) It sounds as if savepoints are exactly the feature your are looking
> for. It would be good to know what exactly did not work for you. The
> MemoryStateBackend is not suitable for large state sizes because it backups
> into the heap memory of the JobManager.
>
> Best, Fabian
>
> 2018-06-05 21:57 GMT+02:00 ashish pok :
>
>> Fabian, Stephan, All,
>>
>> I started a discussion a while back around having a form of event-based
>> checkpointing policy that will help us in some of our high volume data
>> pipelines. Here is an effort to put this in front of community and
>> understand what capabilities can support these type of use cases, how much
>> others feel the same need and potentially a feature that can make it to a
>> user story.
>>
>> *Use Case Summary:*
>> - Extremely high volume of data (events from consumer devices with
>> customer base of over 100M)
>> - Multiple events need to be combined using a windowing streaming app
>> grouped by keys (something like 5 min floor 

Re: IoT Use Case, Problem and Thoughts

2018-06-13 Thread Ashish Pokharel
Hi Fabian,

Thanks for the prompt response and apologies for delayed response. 

You wrapped up the bottom lines pretty well - if I were to wrap it up I’d say 
“best possible” recovery on “known" restarts either say manual cancel + start 
OR framework initiated ones like on operator failures with these constraints 
 - some data loss is ok
 - avoid periodic checkpoints as states are really transient (less than 5 
seconds of lifetime if not milliseconds) and almost all events make it to 
state. I do understand that checkpointing performance has drastically been 
improved and with async and RocksDB options, it should technically not add 
latency in application etc. However, I feel like even with improvements and 
local checkpointing (which we already are doing) it is a lot of “unused” 
IOPS/resource utilization especially if we start to spin up more apps handling 
similar data sources and with similar requirements. On a first blush it feels 
like those resources are better utilized in cluster for apps with stricter SLAs 
for data loss and recovery etc instead.

Basically, I suppose I am thinking Checkpointing feature that is initialized by 
certain actions / events rather than periodic ones. Let me know I am off-base 
here and I should just enable checkpointing in all of these apps and move on :) 

I tried Savepoint again and it looks like the issue is caused by the fact that 
Memory states are large as it is throwing error states are larger than certain 
size. So solution of (1) will possibly solve (2) as well. 

Thanks again,

Ashish


> On Jun 7, 2018, at 4:25 PM, Fabian Hueske  wrote:
> 
> Hi Ashish,
> 
> Thanks for the great write up. 
> If I understood you correctly, there are two different issues that are caused 
> by the disabled checkpointing.
> 
> 1) Recovery from a failure without restarting all operators to preserve the 
> state in the running tasks
> 2) Planned restarts an application without losing all state (even with 
> disabled checkpointing).
> 
> Ad 1) The community is constantly working on reducing the time for 
> checkpointing and recovery. 
> For 1.5, local task recovery was added, which basically stores a state copy 
> on the local disk which is read in case of a recovery. So, tasks are 
> restarted but don't read the to restore state from distributed storage but 
> from the local disk.
> AFAIK, this can only be used together with remote checkpoints. I think this 
> might be an interesting option for you if it would be possible to write 
> checkpoints only to local disk and not remote storage. AFAIK, there are also 
> other efforts to reduce the number of restarted tasks in case of a failure. I 
> guess, you've played with other features such as RocksDBStateBackend, 
> incremental and async checkpoints already. 
> 
> Ad 2) It sounds as if savepoints are exactly the feature your are looking 
> for. It would be good to know what exactly did not work for you. The 
> MemoryStateBackend is not suitable for large state sizes because it backups 
> into the heap memory of the JobManager. 
> 
> Best, Fabian
> 
> 2018-06-05 21:57 GMT+02:00 ashish pok  >:
> Fabian, Stephan, All,
> 
> I started a discussion a while back around having a form of event-based 
> checkpointing policy that will help us in some of our high volume data 
> pipelines. Here is an effort to put this in front of community and understand 
> what capabilities can support these type of use cases, how much others feel 
> the same need and potentially a feature that can make it to a user story.
> 
> Use Case Summary:
> - Extremely high volume of data (events from consumer devices with customer 
> base of over 100M)
> - Multiple events need to be combined using a windowing streaming app grouped 
> by keys (something like 5 min floor of timestamp and unique identifiers for 
> customer devices)
> - "Most" events by a group/key arrive in few seconds if not milliseconds 
> however events can sometimes delay or get lost in transport (so delayed event 
> handling and timeouts will be needed)
> - Extremely low (pretty vague but hopefully details below clarify it more) 
> data loss is acceptable
> - Because of the volume and transient nature of source, checkpointing is 
> turned off (saves on writes to persistence as states/sessions are active for 
> only few seconds during processing)
> 
> Problem Summary:
> Of course, none of the above is out of the norm for Flink and as a matter of 
> factor we already have a Flink app doing this. The issue arises when it comes 
> to graceful shutdowns and on operator failures (eg: Kafka timeouts etc.) On 
> operator failures, entire job graph restarts which essentially flushes out 
> in-memory states/sessions. I think there is a feature in works (not sure if 
> it made it to 1.5) to perform selective restarts which will control the 
> damage but still will result in data loss. Also, it doesn't help when 
> application restarts are needed. We did try going savepoint route for 

Re: IoT Use Case, Problem and Thoughts

2018-06-07 Thread Fabian Hueske
Hi Ashish,

Thanks for the great write up.
If I understood you correctly, there are two different issues that are
caused by the disabled checkpointing.

1) Recovery from a failure without restarting all operators to preserve the
state in the running tasks
2) Planned restarts an application without losing all state (even with
disabled checkpointing).

Ad 1) The community is constantly working on reducing the time for
checkpointing and recovery.
For 1.5, local task recovery was added, which basically stores a state copy
on the local disk which is read in case of a recovery. So, tasks are
restarted but don't read the to restore state from distributed storage but
from the local disk.
AFAIK, this can only be used together with remote checkpoints. I think this
might be an interesting option for you if it would be possible to write
checkpoints only to local disk and not remote storage. AFAIK, there are
also other efforts to reduce the number of restarted tasks in case of a
failure. I guess, you've played with other features such as
RocksDBStateBackend, incremental and async checkpoints already.

Ad 2) It sounds as if savepoints are exactly the feature your are looking
for. It would be good to know what exactly did not work for you. The
MemoryStateBackend is not suitable for large state sizes because it backups
into the heap memory of the JobManager.

Best, Fabian

2018-06-05 21:57 GMT+02:00 ashish pok :

> Fabian, Stephan, All,
>
> I started a discussion a while back around having a form of event-based
> checkpointing policy that will help us in some of our high volume data
> pipelines. Here is an effort to put this in front of community and
> understand what capabilities can support these type of use cases, how much
> others feel the same need and potentially a feature that can make it to a
> user story.
>
> *Use Case Summary:*
> - Extremely high volume of data (events from consumer devices with
> customer base of over 100M)
> - Multiple events need to be combined using a windowing streaming app
> grouped by keys (something like 5 min floor of timestamp and unique
> identifiers for customer devices)
> - "Most" events by a group/key arrive in few seconds if not milliseconds
> however events can sometimes delay or get lost in transport (so delayed
> event handling and timeouts will be needed)
> - Extremely low (pretty vague but hopefully details below clarify it more)
> data loss is acceptable
> - Because of the volume and transient nature of source, checkpointing is
> turned off (saves on writes to persistence as states/sessions are active
> for only few seconds during processing)
>
> *Problem Summary:*
> Of course, none of the above is out of the norm for Flink and as a matter
> of factor we already have a Flink app doing this. The issue arises when it
> comes to graceful shutdowns and on operator failures (eg: Kafka timeouts
> etc.) On operator failures, entire job graph restarts which essentially
> flushes out in-memory states/sessions. I think there is a feature in works
> (not sure if it made it to 1.5) to perform selective restarts which will
> control the damage but still will result in data loss. Also, it doesn't
> help when application restarts are needed. We did try going savepoint route
> for explicit restart needs but I think MemoryBackedState ran into issues
> for larger states or something along those line(not certain). We obviously
> cannot recover an operator that actually fails because it's own state could
> be unrecoverable. However, it feels like Flink already has a lot of
> plumbing to help with overall problem of allowing some sort of recoverable
> state to handle graceful shutdowns and restarts with minimal data loss.
>
> *Solutions:*
> Some in community commented on my last email with decent ideas like having
> an event-based checkpointing trigger (on shutdown, on restart etc) or
> life-cycle hooks (onCancel, onRestart etc) in Functions that can be
> implemented if this type of behavior is needed etc.
>
> Appreciate feedback from community on how useful this might be for others
> and from core contributors on their thoughts as well.
>
> Thanks in advance, Ashish
>
>


IoT Use Case, Problem and Thoughts

2018-06-05 Thread ashish pok
Fabian, Stephan, All,
I started a discussion a while back around having a form of event-based 
checkpointing policy that will help us in some of our high volume data 
pipelines. Here is an effort to put this in front of community and understand 
what capabilities can support these type of use cases, how much others feel the 
same need and potentially a feature that can make it to a user story.
Use Case Summary:- Extremely high volume of data (events from consumer devices 
with customer base of over 100M)- Multiple events need to be combined using a 
windowing streaming app grouped by keys (something like 5 min floor of 
timestamp and unique identifiers for customer devices)- "Most" events by a 
group/key arrive in few seconds if not milliseconds however events can 
sometimes delay or get lost in transport (so delayed event handling and 
timeouts will be needed)- Extremely low (pretty vague but hopefully details 
below clarify it more) data loss is acceptable- Because of the volume and 
transient nature of source, checkpointing is turned off (saves on writes to 
persistence as states/sessions are active for only few seconds during 
processing)
Problem Summary:Of course, none of the above is out of the norm for Flink and 
as a matter of factor we already have a Flink app doing this. The issue arises 
when it comes to graceful shutdowns and on operator failures (eg: Kafka 
timeouts etc.) On operator failures, entire job graph restarts which 
essentially flushes out in-memory states/sessions. I think there is a feature 
in works (not sure if it made it to 1.5) to perform selective restarts which 
will control the damage but still will result in data loss. Also, it doesn't 
help when application restarts are needed. We did try going savepoint route for 
explicit restart needs but I think MemoryBackedState ran into issues for larger 
states or something along those line(not certain). We obviously cannot recover 
an operator that actually fails because it's own state could be unrecoverable. 
However, it feels like Flink already has a lot of plumbing to help with overall 
problem of allowing some sort of recoverable state to handle graceful shutdowns 
and restarts with minimal data loss.
Solutions:Some in community commented on my last email with decent ideas like 
having an event-based checkpointing trigger (on shutdown, on restart etc) or 
life-cycle hooks (onCancel, onRestart etc) in Functions that can be implemented 
if this type of behavior is needed etc. 
Appreciate feedback from community on how useful this might be for others and 
from core contributors on their thoughts as well.
Thanks in advance, Ashish