Hi Ashish, FIRE_AND_PURGE should also clear the window state. Yes I mean with active windows, windows which have not been purged yet.
Maybe Aljoscha knows more about why the window state is growing (I would not rule out a bug). Cheers, Till On Tue, Jul 31, 2018 at 1:45 PM ashish pok <ashish...@yahoo.com> wrote: > Hi Till, > > Keys are unbounded (a group of events have same key but that key doesnt > repeat after it is fired other than some odd delayed events). So basically > there 1 key that will be aligned to a window. When you say key space of > active windows, does that include keys for windows that have already fired > and could be in memory footprint? If so, that is basically the problem I > would get into and looking for a solution to clean-up. Like I said earlier > overriding tigger to FIRE_AND_PURGE did not help. If I take the same stream > and key and refactor it to how Chang is doing it with Process Function, > issue goes away. > > If you mean only currently processing key space of active windows (not the > ones that have already fired) then I would say, that cannot be the case. > We are getting the data from period poll of same number of devices and > uniqueness of key is simply a time identifier prefixed to device > identifier. Even though there could be a little delayed data, the chances > of number of unique keys growing constantly for days is probably none as > device list is constant. > > Thanks, Ashish > > > - Ashish > > On Tuesday, July 31, 2018, 4:05 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > > Hi Ashish, > > the processing time session windows need to store state in the > StateBackends and I assume that your key space of active windows is > constantly growing. That could explain why you are seeing an ever > increasing memory footprint. But without knowing the input stream and what > the UDFs do this is only a guess. > > Cheers, > Till > > On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Chang, > > The state handle objects are not created per key but just once per > function instance. > Instead they route state accesses to the backend (JVM heap or RocksDB) for > the currently active key. > > Best, Fabian > > 2018-07-30 12:19 GMT+02:00 Chang Liu <fluency...@gmail.com>: > > Hi Andrey, > > Thanks for your reply. My question might be silly, but there is still one > part I would like to fully understand. For example, in the following > example: > > class MyFunction extends KeyedProcessFunction[String, Click, Click] { // > keyed by Session ID > lazy val userId: ValueState[String] = getRuntimeContext.getState( > new ValueStateDescriptor[String]("userId", > BasicTypeInfo.STRING_TYPE_INFO)) > > lazy val clicks: ListState[Click] = getRuntimeContext.getListState( > new ListStateDescriptor[Click]("clicks", createTypeInformation[Click])) > > override def processElement( > click: Click, > context: KeyedProcessFunction[String, Click, Click]#Context, > out: Collector[Click]) > : Unit = { > // process, output, clear state if necessary > } > > override def onTimer( > timestamp: Long, > ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext, > out: Collector[Click]) > : Unit = { > // output and clear state > } > } > > > Even though I am regularly clearing the two states, userId and clicks > (which means I am cleaning up the values stored in the States), my question > is: then what about the two State objects themselves: userId and clicks? > These States objects are also created per Session ID right? If the number > of Session IDs are unbounded, than the number of these State objects are > also unbounded. > > That means, there are *userId-state-1 and clicks-state-1 for session-id-1*, > *userId-state-2 and clicks-state-2 for session-id-2*, *userId-state-3 and > clicks-state-3 for session-id-3*, …, which are handled by different (or > same if two from different *range*, as you call it, are assigned to the > same one) keyed operator instance. > > I am not concerning the actual value in the State (which will be managed > carefully, if I am clearing them carefully). I am thinking about the State > objects themselves, which I have no idea what is happening to them and what > will happen to them. > > Many thanks :) > > Best regards/祝好, > > Chang Liu 刘畅 > > > On 26 Jul 2018, at 10:55, Andrey Zagrebin <and...@data-artisans.com> > wrote: > > Hi Chang Liu, > > The unbounded nature of the stream keyed or not should not lead to out of > memory. > > Flink parallel keyed operator instances have fixed number (parallelism) > and just process some range of keyed elements, in your example it is a > subrange of session ids. > > The keyed processed elements (http requests) are objects created when they > enter the pipeline and garage collected after having been processed in > streaming fashion. > > If they arrive very rapidly it can lead to high back pressure from > upstream to downstream operators, buffers can become full and pipeline > stops/slows down processing external inputs, it usually means that your > pipeline is under provisioned. > > The only accumulated data comes from state (windows, user state etc), so > if you control its memory consumption, as Till described, there should be > no other source of out of memory. > > Cheers, > Andrey > > On 25 Jul 2018, at 19:06, Chang Liu <fluency...@gmail.com> wrote: > > Hi Till, > > Thanks for your reply. But I think maybe I did not make my question clear. > My question is not about whether the States within each keyed operator > instances will run out of memory. My question is about, whether the > unlimited keyed operator instances themselves will run out of memory. > > So to reply to your answers, no matter using different State backends or > regularly cleaning up the States (which is exactly what I am doing), it > does not concern the number of keyed operator instances. > > I would like to know: > > - Will the number of keyed operator instances (Java objects?) grow > unbounded? > - If so, will they run out of memory? This is not actually related to > the memory used by the keyed Stated inside. > - If not, then how Flink is managing this multiple keyed operator > instances? > > > I think this needs more knowledge about how Flink works internally to > understand how keyed operator instances are created, maintained and > destroyed. That’s why I would like your help understanding this. > > Many Thanks. > > Best regards/祝好, > > Chang Liu 刘畅 > > > On 24 Jul 2018, at 14:31, Till Rohrmann <trohrm...@apache.org> wrote: > > Hi Chang Liu, > > if you are dealing with an unlimited number of keys and keep state around > for every key, then your state size will keep growing with the number of > keys. If you are using the FileStateBackend which keeps state in memory, > you will eventually run into an OutOfMemoryException. One way to > solve/mitigate this problem is to use the RocksDBStateBackend which can go > out of core. > > Alternatively, you would need to clean up your state before you run out of > memory. One way to do this is to register for every key a timer which > clears the state. But this only works if you don't amass too much state > data before the timer is triggered. If you wish this solution is some kind > of a poor man's state TTL. The Flink community is currently developing a > proper implementation of it which does not rely on additional timers (which > increases the state footprint) [1]. > > [1] https://issues.apache.org/jira/browse/FLINK-9510 > > Cheers, > Till > > On Tue, Jul 24, 2018 at 10:11 AM Chang Liu <fluency...@gmail.com> wrote: > > Dear All, > > I have questions regarding the keys. In general, the questions are: > > - what happens if I am doing keyBy based on unlimited number of keys? > How Flink is managing each KeyedStream under the hood? Will I get memory > overflow, for example, if every KeyStream associated with a specific key is > taking certain amount of memory? > - BTW, I think it is fare to say that, I have to clear my KeyedState > so that the memory used by these State are cleaned up regularly. But still, > I am wondering, even though I am regularly cleaning up State memory, what > happened to memory used by the KeyedStream itself, if there is? And will > they be exploding? > > > Let me give an example for understanding it clearly. Let’s say we have a > > val requestStream: DataStream[HttpRequest] > > which is a stream of HTTP requests. And by using the session ID as the > key, we can obtain a KeyedStream per single session, as following: > > val streamPerSession: KeyedStream[HttpRequest] = > requestStream.keyBy(_.sessionId) > > However, the session IDs are actually a hashcode generated randomly by the > Web service/application, so that means, the number of sessions are > unlimited (which is reasonable, because every time a user open the > application or login, he/she will get a new unique session). > > Then, the question is: will Flink eventually run out of memory because the > number of sessions are unlimited (and because we are keying by the session > ID)? > > - If so, how can we properly manage this situation? > - If not, could you help me understand WHY? > - Let’s also assume that, we are regularly clearing the KeyedState, so > the memory used by the State will not explode. > > > > Many Thanks and Looking forward to your reply :) > > Best regards/祝好, > > Chang Liu 刘畅 > > > > > > >