B.T.W It might be better off to pre aggregation via slidingWindow with controlled bucket size and batch update as well as retention.
Thanks, Chen > On May 29, 2017, at 3:05 PM, Chen Qin <qinnc...@gmail.com> wrote: > > I see, not sure this this hack works. It utilize operator state to hold all > <key, states> mapping assigned to that operator instance. > > If key by can generate determined mapping between upstream events to fixed > operator parallelism, then the operator state could hold mapping between keys > and their states, updates only needed when snapshot triggered.(dump cache to > operator state) I don’t use timer in this case, but keep a last emit map > (keyed by event key) to track when to flush downstream within processFunction. > > > Thanks, > Chen > > >> On May 29, 2017, at 2:38 AM, Aljoscha Krettek <aljos...@apache.org >> <mailto:aljos...@apache.org>> wrote: >> >> Hi Chen, >> >> How to you update the ValueState during checkpointing. I’m asking because a >> keyed state should always be scoped to a key and when checkpointing there is >> no key scope because we are not processing any incoming element and we’re >> not firing a timer (the two cases where we have a key scope). >> >> Best, >> Aljoscha >> >>> On 24. May 2017, at 21:05, Chen Qin <qinnc...@gmail.com >>> <mailto:qinnc...@gmail.com>> wrote: >>> >>> Got it! Looks like 30days window and trigger 10seconds is way too many >>> (quarter million every 10 seconds per key, around 150 keys). >>> >>> Just to add some background, I tried three ways to implement this large >>> sliding window pipeline, all share same configuration and use rocksdb >>> statebackend remote to s3 >>> out of box sliding window 30days 10s trigger >>> processfunction with list state >>> process function with in memory cache, update valuestate during checkpoint, >>> filter & emits list of events periodically. Value state checkpoint as blob >>> seems complete quickly. >>> First two options see perf issue, third one so far works fine. >>> >>> Thanks, >>> Chen >>> >>> On Wed, May 24, 2017 at 8:24 AM, Stefan Richter >>> <s.rich...@data-artisans.com <mailto:s.rich...@data-artisans.com>> wrote: >>> Yes Cast, I noticed your version is already 1.2.1, which is why I contacted >>> Aljoscha to take a look here because he knows best about the expected >>> scalability of the sliding window implementation. >>> >>>> Am 24.05.2017 um 16:49 schrieb Carst Tankink <ctank...@bol.com >>>> <mailto:ctank...@bol.com>>: >>>> >>>> Hi, >>>> >>>> Thanks Aljoshcha! >>>> To complete my understanding: the problem here is that each element in the >>>> sliding window(s) basically triggers 240 get+put calls instead of just 1, >>>> right? I can see how that blows up :-) >>>> I have a good idea on how to proceed next, so I will be trying out writing >>>> the custom ProcessFunction next (week). >>>> >>>> Stefan, in our case we are already on Flink 1.2.1 which should have the >>>> patched version of RocksDB, right? Because that patch did solve an issue >>>> we had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink >>>> which was stalling quite often under Flink 1.2.0) but did not solve this >>>> case, which fits the “way too much RocksDB access” explanation better. >>>> >>>> >>>> Thanks again, >>>> Carst >>>> >>>> From: Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>> >>>> Date: Wednesday, May 24, 2017 at 16:13 >>>> To: Stefan Richter <s.rich...@data-artisans.com >>>> <mailto:s.rich...@data-artisans.com>> >>>> Cc: Carst Tankink <ctank...@bol.com <mailto:ctank...@bol.com>>, >>>> "user@flink.apache.org <mailto:user@flink.apache.org>" >>>> <user@flink.apache.org <mailto:user@flink.apache.org>> >>>> Subject: Re: large sliding window perf question >>>> >>>> Hi, >>>> >>>> I’m afraid you’re running into a general shortcoming of the current >>>> sliding windows implementation: every sliding window is treated as its own >>>> window that has window contents and trigger state/timers. For example, if >>>> you have a sliding window of size 4 hours with 1 minute slide this means >>>> each element is in 240 windows and you basically amplify writing to >>>> RocksDB by 240. This gets out of hand very quickly with larger differences >>>> between window side and slide interval. >>>> >>>> I’m also afraid there is no solution for this right now so the workaround >>>> Chen mentioned is the way to go right now. >>>> >>>> Best, >>>> Aljoscha >>>> On 24. May 2017, at 14:07, Stefan Richter <s.rich...@data-artisans.com >>>> <mailto:s.rich...@data-artisans.com>> wrote: >>>> >>>> Hi, >>>> >>>> both issues sound like the known problem with RocksDB merging state. >>>> Please take a look here >>>> >>>> https://issues.apache.org/jira/browse/FLINK-5756 >>>> <https://issues.apache.org/jira/browse/FLINK-5756> >>>> >>>> and here >>>> >>>> https://github.com/facebook/rocksdb/issues/1988 >>>> <https://github.com/facebook/rocksdb/issues/1988> >>>> >>>> Best, >>>> Stefan >>>> >>>> >>>> Am 24.05.2017 um 14:33 schrieb Carst Tankink <ctank...@bol.com >>>> <mailto:ctank...@bol.com>>: >>>> >>>> Hi, >>>> >>>> We are seeing a similar behaviour for large sliding windows. Let me put >>>> some details here and see if they match up enough with Chen’s: >>>> >>>> Technical specs: >>>> - Flink 1.2.1 on YARN >>>> - RocksDB backend, on HDFS. I’ve set the backend to >>>> PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop >>>> cluster runs on spinning disks but that doesn’t seem to help >>>> >>>> Pipeline: >>>> - Read from Kafka, extract ids >>>> - KeyBy id, count occurences of each id using a fold. The window >>>> size of this operator is 10 minutes with a slide of 1 minute >>>> - KeyBy id (again), compute mean, standard deviation using a >>>> fold. The window size of this operator is 4 hours with a slide of 1 minute. >>>> - Post-process data, sink. >>>> >>>> What I observe is: >>>> - With a heap-based backend, the job runs really quick (couple >>>> of minutes to process 7 days of Kafka data) but eventually goes OOM with a >>>> GC overhead exceeded error. >>>> - With the RocksDB backend, checkpoints get stuck most of the >>>> time, and the “count occurences” step gets a lot of back pressure from the >>>> next operator (on the large window) >>>> o In those cases the checkpoint does succeed, the state for the large >>>> window is around 500-700MB, others states are within the KBs. >>>> o Also in those cases, all time seems to be spent in the ‘alignment’ >>>> phase for a single subtask of the count operator, with the other operators >>>> aligning within milliseconds. The checkpoint duration itself is no more >>>> than 2seconds even for the larger states. >>>> >>>> >>>> At this point, I’m a bit at a loss to figure out what’s going on. My best >>>> guess is it has to do with the state access to the RocksDBFoldingState, >>>> but why this so slow is beyond me. >>>> >>>> Hope this info helps in figuring out what is going on, and hopefully it is >>>> actually related to Chen’s case :) >>>> >>>> >>>> Thanks, >>>> Carst >>>> >>>> From: Stefan Richter <s.rich...@data-artisans.com >>>> <mailto:s.rich...@data-artisans.com>> >>>> Date: Tuesday, May 23, 2017 at 21:35 >>>> To: "user@flink.apache.org <mailto:user@flink.apache.org>" >>>> <user@flink.apache.org <mailto:user@flink.apache.org>> >>>> Subject: Re: large sliding window perf question >>>> >>>> Hi, >>>> >>>> Which state backend and Flink version are you using? There was a problem >>>> with large merging states on RocksDB, caused by some inefficiencies in the >>>> merge operator of RocksDB. We provide a custom patch for this with all >>>> newer versions of Flink. >>>> >>>> Best, >>>> Stefan >>>> >>>> Am 23.05.2017 um 21:24 schrieb Chen Qin <qinnc...@gmail.com >>>> <mailto:qinnc...@gmail.com>>: >>>> >>>> Hi there, >>>> >>>> I have seen some weird perf issue while running event time based job with >>>> large sliding window (24 hours offset every 10s) >>>> >>>> pipeline looks simple, >>>> tail kafka topic and assign timestamp and watermark, forward to large >>>> sliding window (30days) and fire every 10 seconds and print out. >>>> >>>> what I have seen first hand was checkpointing stuck, took longer than >>>> timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems >>>> back pressure kick in and window operator consumes message really slowly >>>> and throttle sources. >>>> >>>> I also tried to limit window time to mins and all issues are gone. >>>> >>>> Any suggestion on this. My work around is I implemented processFunction >>>> and keep big value state, periodically evaluate and emit downstream >>>> (emulate what sliding window does) >>>> >>>> Thanks, >>>> Chen >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>> >>> >> >