Hi Kien,

Thanks for your reply! 

The approaches your suggested are very useful. I'll redesign the state 
structure and try these approaches. Thanks a lot!

Best,
Paul Lam


> 在 2018年9月14日,17:01,Kien Truong <duckientru...@gmail.com> 写道:
> 
> Hi Paul,
> 
> We have actually done something like this. Clearing a state with rocksdb 
> state backend can actually be a very expensive operation, and block the 
> operators for minutes with large states. 
> 
> To mitigate that, there are 2 approaches that we are using
> 
> 1. Keeping the state small by increasing the cardinality of the key
> 2. Do not clear the entire state at once, but continuously remove small chunk 
> of the state using timers.
> 
> Regards,
> Kien
> 
>> On Sep 14, 2018 at 15:01, <David Anderson <mailto:da...@data-artisans.com>> 
>> wrote: 
>> 
>> Paul,
>> 
>> Theoretically, processing-time timers will get the job done, but yes, you'd 
>> need a timer per key -- and folks who've tried this with millions of keys, 
>> all firing at the same time, have reported that this behaves badly. For some 
>> use cases it's workable to spread out the timers over an interval, like an 
>> hour or two, to avoid this timer firing storm, but that doesn't sound like 
>> it would work well for you.
>> 
>> You might instead try using broadcast state to deal with this. You would 
>> establish a broadcast stream connected to your keyed stream that acts as a 
>> control stream for the keyed state. Then in the processBroadcastElement 
>> method of a KeyedBroadcastProcessFunction you would use applyToKeyedState to 
>> iterate over all the keyed state and clear everything. Unfortunately it's 
>> not possible to use timers on broadcast state, so you'll have to find some 
>> other way to trigger the event on the broadcast stream -- maybe a custom 
>> source that uses a ProcessingTimeCallback to create events on the broadcast 
>> stream.
>> 
>> David
>> 
>> On Fri, Sep 14, 2018 at 7:18 AM Paul Lam <paullin3...@gmail.com 
>> <mailto:paullin3...@gmail.com>> wrote:
>> >
>> > Hi vino,
>> >
>> > Thanks for the advice, but I think state TTL does not completely fit in my 
>> > case.
>> >
>> > AFAIK, State TTL is per entry level and uses an inactive time threshold to 
>> > expire entries, but I need a TTL for the whole MapState, which does not 
>> > depend on when the entries are created or updated. Suppose I’m calculating 
>> > stats of daily active users and use a userId field as key, I want the 
>> > state totally truncated at the very beginning of each day. 
>> >
>> > Thanks a lot!
>> >
>> > Best,
>> > Paul Lam
>> >
>> >
>> > 在 2018年9月14日,10:39,vino yang <yanghua1...@gmail.com 
>> > <mailto:yanghua1...@gmail.com>> 写道:
>> >
>> > Hi Paul,
>> >
>> > Maybe you can try to understand the State TTL?[1]
>> >
>> > Thanks, vino.
>> >
>> > [1]: 
>> > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
>> >  
>> > <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl>
>> >
>> > Paul Lam <paullin3...@gmail.com <mailto:paullin3...@gmail.com>> 
>> > 于2018年9月12日周三 下午6:06写道:
>> >>
>> >> Hi,
>> >>
>> >> I’m using MapState to deduplicate some ids and the MapState needs to be 
>> >> truncated periodically. I tried to use ProcessingTimeCallback to call 
>> >> state.clear(), but in this way I can only clear the state for one key, 
>> >> and actually I need a key group level cleanup. So I’m wondering is there 
>> >> any best practice for my case? Thanks a lot!
>> >>
>> >> Best,
>> >> Paul Lam
>> >
>> >
>> 
>> 
>> --
>> David Anderson | Training Coordinator | data Artisans
>> --
>> Join Flink Forward - The Apache Flink Conference
>> Stream Processing | Event Driven | Real Time

Reply via email to