B.T.W It might be better off to pre aggregation via slidingWindow with 
controlled bucket size and batch update as well as retention.

Thanks,
Chen

> On May 29, 2017, at 3:05 PM, Chen Qin <qinnc...@gmail.com> wrote:
> 
> I see, not sure this this hack works. It utilize operator state to hold all 
> <key, states> mapping assigned to that operator instance.
> 
> If key by can generate determined mapping between upstream events to fixed 
> operator parallelism, then the operator state could hold mapping between keys 
>  and their states, updates only needed when snapshot triggered.(dump cache to 
> operator state) I don’t use timer in this case, but keep a last emit map 
> (keyed by event key) to track when to flush downstream within processFunction.
> 
> 
> Thanks,
> Chen
> 
> 
>> On May 29, 2017, at 2:38 AM, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> 
>> Hi Chen,
>> 
>> How to you update the ValueState during checkpointing. I’m asking because a 
>> keyed state should always be scoped to a key and when checkpointing there is 
>> no key scope because we are not processing any incoming element and we’re 
>> not firing a timer (the two cases where we have a key scope).
>> 
>> Best,
>> Aljoscha
>> 
>>> On 24. May 2017, at 21:05, Chen Qin <qinnc...@gmail.com 
>>> <mailto:qinnc...@gmail.com>> wrote:
>>> 
>>> Got it! Looks like 30days window and trigger 10seconds is way too many 
>>> (quarter million every 10 seconds per key, around 150 keys). 
>>> 
>>> Just to add some background, I tried three ways to implement this large 
>>> sliding window pipeline, all share same configuration and use rocksdb 
>>> statebackend remote to s3
>>> out of box sliding window 30days 10s trigger
>>> processfunction with list state
>>> process function with in memory cache, update valuestate during checkpoint, 
>>> filter & emits list of events periodically. Value state checkpoint as blob 
>>> seems complete quickly.
>>> First two options see perf issue, third one so far works fine.
>>> 
>>> Thanks,
>>> Chen
>>> 
>>> On Wed, May 24, 2017 at 8:24 AM, Stefan Richter 
>>> <s.rich...@data-artisans.com <mailto:s.rich...@data-artisans.com>> wrote:
>>> Yes Cast, I noticed your version is already 1.2.1, which is why I contacted 
>>> Aljoscha to take a look here because he knows best about the expected 
>>> scalability of the sliding window implementation.
>>>  
>>>> Am 24.05.2017 um 16:49 schrieb Carst Tankink <ctank...@bol.com 
>>>> <mailto:ctank...@bol.com>>:
>>>> 
>>>> Hi,
>>>>  
>>>> Thanks Aljoshcha!
>>>> To complete my understanding: the problem here is that each element in the 
>>>> sliding window(s) basically triggers 240 get+put calls instead of just 1, 
>>>> right? I can see how that blows up :-) 
>>>> I have a good idea on how to proceed next, so I will be trying out writing 
>>>> the custom ProcessFunction next (week).
>>>>  
>>>> Stefan, in our case we are already on Flink 1.2.1 which should have the 
>>>> patched version of RocksDB, right? Because that patch did solve an issue 
>>>> we had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink 
>>>> which was stalling quite often under Flink 1.2.0) but did not solve this 
>>>> case, which fits the “way too much RocksDB access” explanation better.
>>>>  
>>>>  
>>>> Thanks again,
>>>> Carst
>>>>  
>>>> From: Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>>
>>>> Date: Wednesday, May 24, 2017 at 16:13
>>>> To: Stefan Richter <s.rich...@data-artisans.com 
>>>> <mailto:s.rich...@data-artisans.com>>
>>>> Cc: Carst Tankink <ctank...@bol.com <mailto:ctank...@bol.com>>, 
>>>> "user@flink.apache.org <mailto:user@flink.apache.org>" 
>>>> <user@flink.apache.org <mailto:user@flink.apache.org>>
>>>> Subject: Re: large sliding window perf question
>>>>  
>>>> Hi, 
>>>>  
>>>> I’m afraid you’re running into a general shortcoming of the current 
>>>> sliding windows implementation: every sliding window is treated as its own 
>>>> window that has window contents and trigger state/timers. For example, if 
>>>> you have a sliding window of size 4 hours with 1 minute slide this means 
>>>> each element is in 240 windows and you basically amplify writing to 
>>>> RocksDB by 240. This gets out of hand very quickly with larger differences 
>>>> between window side and slide interval.
>>>>  
>>>> I’m also afraid there is no solution for this right now so the workaround 
>>>> Chen mentioned is the way to go right now.
>>>>  
>>>> Best,
>>>> Aljoscha
>>>> On 24. May 2017, at 14:07, Stefan Richter <s.rich...@data-artisans.com 
>>>> <mailto:s.rich...@data-artisans.com>> wrote:
>>>>  
>>>> Hi, 
>>>>  
>>>> both issues sound like the known problem with RocksDB merging state. 
>>>> Please take a look here
>>>>  
>>>> https://issues.apache.org/jira/browse/FLINK-5756 
>>>> <https://issues.apache.org/jira/browse/FLINK-5756>
>>>>  
>>>> and here
>>>>  
>>>> https://github.com/facebook/rocksdb/issues/1988 
>>>> <https://github.com/facebook/rocksdb/issues/1988>
>>>>  
>>>> Best,
>>>> Stefan
>>>>  
>>>>  
>>>> Am 24.05.2017 um 14:33 schrieb Carst Tankink <ctank...@bol.com 
>>>> <mailto:ctank...@bol.com>>:
>>>>  
>>>> Hi,
>>>>  
>>>> We are seeing a similar behaviour for large sliding windows. Let me put 
>>>> some details here and see if they match up enough with Chen’s:
>>>>  
>>>> Technical specs:
>>>> -          Flink 1.2.1 on YARN
>>>> -          RocksDB backend, on HDFS. I’ve set the backend to 
>>>> PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop 
>>>> cluster runs on spinning disks but that doesn’t seem to help
>>>>  
>>>> Pipeline:
>>>> -          Read from Kafka, extract ids
>>>> -          KeyBy id,  count occurences of each id using a fold. The window 
>>>> size of this operator is 10 minutes with a slide of 1 minute
>>>> -          KeyBy id (again),  compute mean, standard deviation using a 
>>>> fold. The window size of this operator is 4 hours with a slide of 1 minute.
>>>> -          Post-process data, sink.
>>>>  
>>>> What I observe is:
>>>> -          With a heap-based backend, the job runs really quick  (couple 
>>>> of minutes to process 7 days of Kafka data) but eventually goes OOM with a 
>>>> GC overhead exceeded error.
>>>> -          With the RocksDB backend, checkpoints get stuck most of the 
>>>> time, and the “count occurences” step gets a lot of back pressure from the 
>>>> next operator (on the large window)
>>>> o    In those cases the checkpoint does succeed, the state for the large 
>>>> window is around 500-700MB, others states are within the KBs.
>>>> o    Also in those cases, all time seems to be spent in the ‘alignment’ 
>>>> phase for a single subtask of the count operator, with the other operators 
>>>> aligning within milliseconds. The checkpoint duration itself is no more 
>>>> than 2seconds even for the larger states.
>>>>  
>>>>  
>>>> At this point, I’m a bit at a loss to figure out what’s going on. My best 
>>>> guess is it has to do with the state access to the RocksDBFoldingState, 
>>>> but why this so slow is beyond me.
>>>>  
>>>> Hope this info helps in figuring out what is going on, and hopefully it is 
>>>> actually related to Chen’s case :)
>>>>  
>>>>  
>>>> Thanks,
>>>> Carst
>>>>  
>>>> From: Stefan Richter <s.rich...@data-artisans.com 
>>>> <mailto:s.rich...@data-artisans.com>>
>>>> Date: Tuesday, May 23, 2017 at 21:35
>>>> To: "user@flink.apache.org <mailto:user@flink.apache.org>" 
>>>> <user@flink.apache.org <mailto:user@flink.apache.org>>
>>>> Subject: Re: large sliding window perf question
>>>>  
>>>> Hi,
>>>>  
>>>> Which state backend and Flink version are you using? There was a problem 
>>>> with large merging states on RocksDB, caused by some inefficiencies in the 
>>>> merge operator of RocksDB. We provide a custom patch for this with all 
>>>> newer versions of Flink. 
>>>>  
>>>> Best,
>>>> Stefan
>>>>  
>>>> Am 23.05.2017 um 21:24 schrieb Chen Qin <qinnc...@gmail.com 
>>>> <mailto:qinnc...@gmail.com>>:
>>>>  
>>>> Hi there,
>>>>  
>>>> I have seen some weird perf issue while running event time based job with 
>>>> large sliding window (24 hours offset every 10s) 
>>>>  
>>>> pipeline looks simple, 
>>>> tail kafka topic and assign timestamp and watermark, forward to large 
>>>> sliding window (30days) and fire every 10 seconds and print out.
>>>>  
>>>> what I have seen first hand was checkpointing stuck, took longer than 
>>>> timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems 
>>>> back pressure kick in and window operator consumes message really slowly 
>>>> and throttle sources.
>>>>  
>>>> I also tried to limit window time to mins and all issues are gone.
>>>>  
>>>> Any suggestion on this. My work around is I implemented processFunction 
>>>> and keep big value state, periodically evaluate and emit downstream 
>>>> (emulate what sliding window does)
>>>>  
>>>> Thanks,
>>>> Chen
>>>>  
>>>>  
>>>> 
>>>> 
>>>> 
>>>>  
>>>>  
>>> 
>>> 
>> 
> 

Reply via email to