Re: How to clear keyed states periodically?

2018-09-14 Thread Paul Lam
Hi Kien,

Thanks for your reply! 

The approaches your suggested are very useful. I'll redesign the state 
structure and try these approaches. Thanks a lot!

Best,
Paul Lam


> 在 2018年9月14日,17:01,Kien Truong  写道:
> 
> Hi Paul,
> 
> We have actually done something like this. Clearing a state with rocksdb 
> state backend can actually be a very expensive operation, and block the 
> operators for minutes with large states. 
> 
> To mitigate that, there are 2 approaches that we are using
> 
> 1. Keeping the state small by increasing the cardinality of the key
> 2. Do not clear the entire state at once, but continuously remove small chunk 
> of the state using timers.
> 
> Regards,
> Kien
> 
>> On Sep 14, 2018 at 15:01, mailto:da...@data-artisans.com>> 
>> wrote: 
>> 
>> Paul,
>> 
>> Theoretically, processing-time timers will get the job done, but yes, you'd 
>> need a timer per key -- and folks who've tried this with millions of keys, 
>> all firing at the same time, have reported that this behaves badly. For some 
>> use cases it's workable to spread out the timers over an interval, like an 
>> hour or two, to avoid this timer firing storm, but that doesn't sound like 
>> it would work well for you.
>> 
>> You might instead try using broadcast state to deal with this. You would 
>> establish a broadcast stream connected to your keyed stream that acts as a 
>> control stream for the keyed state. Then in the processBroadcastElement 
>> method of a KeyedBroadcastProcessFunction you would use applyToKeyedState to 
>> iterate over all the keyed state and clear everything. Unfortunately it's 
>> not possible to use timers on broadcast state, so you'll have to find some 
>> other way to trigger the event on the broadcast stream -- maybe a custom 
>> source that uses a ProcessingTimeCallback to create events on the broadcast 
>> stream.
>> 
>> David
>> 
>> On Fri, Sep 14, 2018 at 7:18 AM Paul Lam > > wrote:
>> >
>> > Hi vino,
>> >
>> > Thanks for the advice, but I think state TTL does not completely fit in my 
>> > case.
>> >
>> > AFAIK, State TTL is per entry level and uses an inactive time threshold to 
>> > expire entries, but I need a TTL for the whole MapState, which does not 
>> > depend on when the entries are created or updated. Suppose I’m calculating 
>> > stats of daily active users and use a userId field as key, I want the 
>> > state totally truncated at the very beginning of each day. 
>> >
>> > Thanks a lot!
>> >
>> > Best,
>> > Paul Lam
>> >
>> >
>> > 在 2018年9月14日,10:39,vino yang > > > 写道:
>> >
>> > Hi Paul,
>> >
>> > Maybe you can try to understand the State TTL?[1]
>> >
>> > Thanks, vino.
>> >
>> > [1]: 
>> > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
>> >  
>> > 
>> >
>> > Paul Lam mailto:paullin3...@gmail.com>> 
>> > 于2018年9月12日周三 下午6:06写道:
>> >>
>> >> Hi,
>> >>
>> >> I’m using MapState to deduplicate some ids and the MapState needs to be 
>> >> truncated periodically. I tried to use ProcessingTimeCallback to call 
>> >> state.clear(), but in this way I can only clear the state for one key, 
>> >> and actually I need a key group level cleanup. So I’m wondering is there 
>> >> any best practice for my case? Thanks a lot!
>> >>
>> >> Best,
>> >> Paul Lam
>> >
>> >
>> 
>> 
>> --
>> David Anderson | Training Coordinator | data Artisans
>> --
>> Join Flink Forward - The Apache Flink Conference
>> Stream Processing | Event Driven | Real Time



Re: How to clear keyed states periodically?

2018-09-14 Thread David Anderson
Paul -- I'm not confident the broadcast approach would perform well enough.
Even without all those timers your job might behave poorly if you try to
hit all of the keys at once to clear all the state; I don't know that
anyone has tried this. As Kien suggested, it may be necessary to find an
approach to do this state clearing more continuously.

David

On Fri, Sep 14, 2018 at 11:28 AM Paul Lam  wrote:

> Hi David,
>
> Your information is very helpful! Thank you!
>
> BroadcastStream can definitely do the job, but I think it makes the
> architecture kind of complicated, so it will be my last resort .
>
> I wonder if it’s possible to implement a clearAll() method for keyed
> states which clears user states for all namespaces, and does it violate the
> principle of keyed states?
>
> Thanks again!
>
> Best,
> Paul Lam
>
> 在 2018年9月14日,16:00,David Anderson  写道:
>
> Paul,
>
> Theoretically, processing-time timers will get the job done, but yes,
> you'd need a timer per key -- and folks who've tried this with millions of
> keys, all firing at the same time, have reported that this behaves badly.
> For some use cases it's workable to spread out the timers over an interval,
> like an hour or two, to avoid this timer firing storm, but that doesn't
> sound like it would work well for you.
>
> You might instead try using broadcast state to deal with this. You would
> establish a broadcast stream connected to your keyed stream that acts as a
> control stream for the keyed state. Then in the processBroadcastElement
> method of a KeyedBroadcastProcessFunction you would use applyToKeyedState
> to iterate over all the keyed state and clear everything. Unfortunately
> it's not possible to use timers on broadcast state, so you'll have to find
> some other way to trigger the event on the broadcast stream -- maybe a
> custom source that uses a ProcessingTimeCallback to create events on the
> broadcast stream.
>
> David
>
> On Fri, Sep 14, 2018 at 7:18 AM Paul Lam  wrote:
> >
> > Hi vino,
> >
> > Thanks for the advice, but I think state TTL does not completely fit in
> my case.
> >
> > AFAIK, State TTL is per entry level and uses an inactive time threshold
> to expire entries, but I need a TTL for the whole MapState, which does not
> depend on when the entries are created or updated. Suppose I’m calculating
> stats of daily active users and use a userId field as key, I want the state
> totally truncated at the very beginning of each day.
> >
> > Thanks a lot!
> >
> > Best,
> > Paul Lam
> >
> >
> > 在 2018年9月14日,10:39,vino yang  写道:
> >
> > Hi Paul,
> >
> > Maybe you can try to understand the State TTL?[1]
> >
> > Thanks, vino.
> >
> > [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
> >
> > Paul Lam  于2018年9月12日周三 下午6:06写道:
> >>
> >> Hi,
> >>
> >> I’m using MapState to deduplicate some ids and the MapState needs to be
> truncated periodically. I tried to use ProcessingTimeCallback to call
> state.clear(), but in this way I can only clear the state for one key, and
> actually I need a key group level cleanup. So I’m wondering is there any
> best practice for my case? Thanks a lot!
> >>
> >> Best,
> >> Paul Lam
> >
> >
>
>
> --
> David Anderson | Training Coordinator | data Artisans
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
>
>
>

-- 
*David Anderson* | Training Coordinator | data Artisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time


Re: How to clear keyed states periodically?

2018-09-14 Thread Paul Lam
Hi David,

Your information is very helpful! Thank you!

BroadcastStream can definitely do the job, but I think it makes the 
architecture kind of complicated, so it will be my last resort . 

I wonder if it’s possible to implement a clearAll() method for keyed states 
which clears user states for all namespaces, and does it violate the principle 
of keyed states? 

Thanks again!

Best,
Paul Lam

> 在 2018年9月14日,16:00,David Anderson  写道:
> 
> Paul,
> 
> Theoretically, processing-time timers will get the job done, but yes, you'd 
> need a timer per key -- and folks who've tried this with millions of keys, 
> all firing at the same time, have reported that this behaves badly. For some 
> use cases it's workable to spread out the timers over an interval, like an 
> hour or two, to avoid this timer firing storm, but that doesn't sound like it 
> would work well for you.
> 
> You might instead try using broadcast state to deal with this. You would 
> establish a broadcast stream connected to your keyed stream that acts as a 
> control stream for the keyed state. Then in the processBroadcastElement 
> method of a KeyedBroadcastProcessFunction you would use applyToKeyedState to 
> iterate over all the keyed state and clear everything. Unfortunately it's not 
> possible to use timers on broadcast state, so you'll have to find some other 
> way to trigger the event on the broadcast stream -- maybe a custom source 
> that uses a ProcessingTimeCallback to create events on the broadcast stream.
> 
> David
> 
> On Fri, Sep 14, 2018 at 7:18 AM Paul Lam  > wrote:
> >
> > Hi vino,
> >
> > Thanks for the advice, but I think state TTL does not completely fit in my 
> > case.
> >
> > AFAIK, State TTL is per entry level and uses an inactive time threshold to 
> > expire entries, but I need a TTL for the whole MapState, which does not 
> > depend on when the entries are created or updated. Suppose I’m calculating 
> > stats of daily active users and use a userId field as key, I want the state 
> > totally truncated at the very beginning of each day. 
> >
> > Thanks a lot!
> >
> > Best,
> > Paul Lam
> >
> >
> > 在 2018年9月14日,10:39,vino yang  > > 写道:
> >
> > Hi Paul,
> >
> > Maybe you can try to understand the State TTL?[1]
> >
> > Thanks, vino.
> >
> > [1]: 
> > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
> >  
> > 
> >
> > Paul Lam mailto:paullin3...@gmail.com>> 
> > 于2018年9月12日周三 下午6:06写道:
> >>
> >> Hi,
> >>
> >> I’m using MapState to deduplicate some ids and the MapState needs to be 
> >> truncated periodically. I tried to use ProcessingTimeCallback to call 
> >> state.clear(), but in this way I can only clear the state for one key, and 
> >> actually I need a key group level cleanup. So I’m wondering is there any 
> >> best practice for my case? Thanks a lot!
> >>
> >> Best,
> >> Paul Lam
> >
> >
> 
> 
> --
> David Anderson | Training Coordinator | data Artisans
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time



Re: How to clear keyed states periodically?

2018-09-14 Thread Kien Truong
Hi Paul,

We have actually done something like this. Clearing a state with rocksdb
state backend can actually be a very expensive operation, and block the
operators for minutes with large states.

To mitigate that, there are 2 approaches that we are using

1. Keeping the state small by increasing the cardinality of the key
2. Do not clear the entire state at once, but continuously remove small
chunk of the state using timers.

Regards,
Kien

On Sep 14, 2018 at 15:01, > wrote:

Paul,

Theoretically, processing-time timers will get the job done, but yes, you'd
need a timer per key -- and folks who've tried this with millions of keys,
all firing at the same time, have reported that this behaves badly. For
some use cases it's workable to spread out the timers over an interval,
like an hour or two, to avoid this timer firing storm, but that doesn't
sound like it would work well for you.

You might instead try using broadcast state to deal with this. You would
establish a broadcast stream connected to your keyed stream that acts as a
control stream for the keyed state. Then in the processBroadcastElement
method of a KeyedBroadcastProcessFunction you would use applyToKeyedState
to iterate over all the keyed state and clear everything. Unfortunately
it's not possible to use timers on broadcast state, so you'll have to find
some other way to trigger the event on the broadcast stream -- maybe a
custom source that uses a ProcessingTimeCallback to create events on the
broadcast stream.

David

On Fri, Sep 14, 2018 at 7:18 AM Paul Lam  wrote:
>
> Hi vino,
>
> Thanks for the advice, but I think state TTL does not completely fit in
my case.
>
> AFAIK, State TTL is per entry level and uses an inactive time threshold
to expire entries, but I need a TTL for the whole MapState, which does not
depend on when the entries are created or updated. Suppose I’m calculating
stats of daily active users and use a userId field as key, I want the state
totally truncated at the very beginning of each day.
>
> Thanks a lot!
>
> Best,
> Paul Lam
>
>
> 在 2018年9月14日,10:39,vino yang  写道:
>
> Hi Paul,
>
> Maybe you can try to understand the State TTL?[1]
>
> Thanks, vino.
>
> [1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
>
> Paul Lam  于2018年9月12日周三 下午6:06写道:
>>
>> Hi,
>>
>> I’m using MapState to deduplicate some ids and the MapState needs to be
truncated periodically. I tried to use ProcessingTimeCallback to call
state.clear(), but in this way I can only clear the state for one key, and
actually I need a key group level cleanup. So I’m wondering is there any
best practice for my case? Thanks a lot!
>>
>> Best,
>> Paul Lam
>
>


--
David Anderson | Training Coordinator | data Artisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time


Re: How to clear keyed states periodically?

2018-09-14 Thread David Anderson
Paul,

Theoretically, processing-time timers will get the job done, but yes, you'd
need a timer per key -- and folks who've tried this with millions of keys,
all firing at the same time, have reported that this behaves badly. For
some use cases it's workable to spread out the timers over an interval,
like an hour or two, to avoid this timer firing storm, but that doesn't
sound like it would work well for you.

You might instead try using broadcast state to deal with this. You would
establish a broadcast stream connected to your keyed stream that acts as a
control stream for the keyed state. Then in the processBroadcastElement
method of a KeyedBroadcastProcessFunction you would use applyToKeyedState
to iterate over all the keyed state and clear everything. Unfortunately
it's not possible to use timers on broadcast state, so you'll have to find
some other way to trigger the event on the broadcast stream -- maybe a
custom source that uses a ProcessingTimeCallback to create events on the
broadcast stream.

David

On Fri, Sep 14, 2018 at 7:18 AM Paul Lam  wrote:
>
> Hi vino,
>
> Thanks for the advice, but I think state TTL does not completely fit in
my case.
>
> AFAIK, State TTL is per entry level and uses an inactive time threshold
to expire entries, but I need a TTL for the whole MapState, which does not
depend on when the entries are created or updated. Suppose I’m calculating
stats of daily active users and use a userId field as key, I want the state
totally truncated at the very beginning of each day.
>
> Thanks a lot!
>
> Best,
> Paul Lam
>
>
> 在 2018年9月14日,10:39,vino yang  写道:
>
> Hi Paul,
>
> Maybe you can try to understand the State TTL?[1]
>
> Thanks, vino.
>
> [1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
>
> Paul Lam  于2018年9月12日周三 下午6:06写道:
>>
>> Hi,
>>
>> I’m using MapState to deduplicate some ids and the MapState needs to be
truncated periodically. I tried to use ProcessingTimeCallback to call
state.clear(), but in this way I can only clear the state for one key, and
actually I need a key group level cleanup. So I’m wondering is there any
best practice for my case? Thanks a lot!
>>
>> Best,
>> Paul Lam
>
>


--
David Anderson | Training Coordinator | data Artisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time


Re: How to clear keyed states periodically?

2018-09-13 Thread Paul Lam
Hi vino,

Thanks for the advice, but I think state TTL does not completely fit in my case.

AFAIK, State TTL is per entry level and uses an inactive time threshold to 
expire entries, but I need a TTL for the whole MapState, which does not depend 
on when the entries are created or updated. Suppose I’m calculating stats of 
daily active users and use a userId field as key, I want the state totally 
truncated at the very beginning of each day. 

Thanks a lot!

Best,
Paul Lam


> 在 2018年9月14日,10:39,vino yang  写道:
> 
> Hi Paul,
> 
> Maybe you can try to understand the State TTL?[1]
> 
> Thanks, vino.
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
>  
> 
> 
> Paul Lam mailto:paullin3...@gmail.com>> 于2018年9月12日周三 
> 下午6:06写道:
> Hi, 
> 
> I’m using MapState to deduplicate some ids and the MapState needs to be 
> truncated periodically. I tried to use ProcessingTimeCallback to call 
> state.clear(), but in this way I can only clear the state for one key, and 
> actually I need a key group level cleanup. So I’m wondering is there any best 
> practice for my case? Thanks a lot!
> 
> Best,
> Paul Lam



Re: How to clear keyed states periodically?

2018-09-13 Thread vino yang
Hi Paul,

Maybe you can try to understand the State TTL?[1]

Thanks, vino.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl

Paul Lam  于2018年9月12日周三 下午6:06写道:

> Hi,
>
> I’m using MapState to deduplicate some ids and the MapState needs to be
> truncated periodically. I tried to use ProcessingTimeCallback to call
> state.clear(), but in this way I can only clear the state for one key, and
> actually I need a key group level cleanup. So I’m wondering is there any
> best practice for my case? Thanks a lot!
>
> Best,
> Paul Lam