Hi Ashish,

FIRE_AND_PURGE should also clear the window state. Yes I mean with active
windows, windows which have not been purged yet.

Maybe Aljoscha knows more about why the window state is growing (I would
not rule out a bug).


On Tue, Jul 31, 2018 at 1:45 PM ashish pok <ashish...@yahoo.com> wrote:

> Hi Till,
> Keys are unbounded (a group of events have same key but that key doesnt
> repeat after it is fired other than some odd delayed events). So basically
> there 1 key that will be aligned to a window. When you say key space of
> active windows, does that include keys for windows that have already fired
> and could be in memory footprint? If so, that is basically the problem I
> would get into and looking for a solution to clean-up. Like I said earlier
> overriding tigger to FIRE_AND_PURGE did not help. If I take the same stream
> and key and refactor it to how Chang is doing it with Process Function,
> issue goes away.
> If you mean only currently processing key space of active windows (not the
> ones that have already fired)  then I would say, that cannot be the case.
> We are getting the data from period poll of same number of devices and
> uniqueness of key is simply a time identifier prefixed to device
> identifier. Even though there could be a little delayed data, the chances
> of number of unique keys growing constantly for days is probably none as
> device list is constant.
> Thanks, Ashish
> - Ashish
> On Tuesday, July 31, 2018, 4:05 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
> Hi Ashish,
> the processing time session windows need to store state in the
> StateBackends and I assume that your key space of active windows is
> constantly growing. That could explain why you are seeing an ever
> increasing memory footprint. But without knowing the input stream and what
> the UDFs do this is only a guess.
> Cheers,
> Till
> On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske <fhue...@gmail.com> wrote:
> Hi Chang,
> The state handle objects are not created per key but just once per
> function instance.
> Instead they route state accesses to the backend (JVM heap or RocksDB) for
> the currently active key.
> Best, Fabian
> 2018-07-30 12:19 GMT+02:00 Chang Liu <fluency...@gmail.com>:
> Hi Andrey,
> Thanks for your reply. My question might be silly, but there is still one
> part I would like to fully understand. For example, in the following
> example:
> class MyFunction extends KeyedProcessFunction[String, Click, Click] { // 
> keyed by Session ID
>   lazy val userId: ValueState[String] = getRuntimeContext.getState(
>     new ValueStateDescriptor[String]("userId", 
> BasicTypeInfo.STRING_TYPE_INFO))
>   lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
>     new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))
>   override def processElement(
>       click: Click,
>       context: KeyedProcessFunction[String, Click, Click]#Context,
>       out: Collector[Click])
>   : Unit = {
>     // process, output, clear state if necessary
>   }
>   override def onTimer(
>       timestamp: Long,
>       ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
>       out: Collector[Click])
>   : Unit = {
>     // output and clear state
>   }
> }
> Even though I am regularly clearing the two states, userId and clicks
> (which means I am cleaning up the values stored in the States), my question
> is: then what about the two State objects themselves: userId and clicks?
> These States objects are also created per Session ID right? If the number
> of Session IDs are unbounded, than the number of these State objects are
> also unbounded.
> That means, there are *userId-state-1 and clicks-state-1 for session-id-1*,
> *userId-state-2 and clicks-state-2 for session-id-2*, *userId-state-3 and
> clicks-state-3 for session-id-3*, …, which are handled by different (or
> same if two from different *range*, as you call it, are assigned to the
> same one) keyed operator instance.
> I am not concerning the actual value in the State (which will be managed
> carefully, if I am clearing them carefully). I am thinking about the State
> objects themselves, which I have no idea what is happening to them and what
> will happen to them.
> Many thanks :)
> Best regards/祝好,
> Chang Liu 刘畅
> On 26 Jul 2018, at 10:55, Andrey Zagrebin <and...@data-artisans.com>
> wrote:
> Hi Chang Liu,
> The unbounded nature of the stream keyed or not should not lead to out of
> memory.
> Flink parallel keyed operator instances have fixed number (parallelism)
> and just process some range of keyed elements, in your example it is a
> subrange of session ids.
> The keyed processed elements (http requests) are objects created when they
> enter the pipeline and garage collected after having been processed in
> streaming fashion.
> If they arrive very rapidly it can lead to high back pressure from
> upstream to downstream operators, buffers can become full and pipeline
> stops/slows down processing external inputs, it usually means that your
> pipeline is under provisioned.
> The only accumulated data comes from state (windows, user state etc), so
> if you control its memory consumption, as Till described, there should be
> no other source of out of memory.
> Cheers,
> Andrey
> On 25 Jul 2018, at 19:06, Chang Liu <fluency...@gmail.com> wrote:
> Hi Till,
> Thanks for your reply. But I think maybe I did not make my question clear.
> My question is not about whether the States within each keyed operator
> instances will run out of memory. My question is about, whether the
> unlimited keyed operator instances themselves will run out of memory.
> So to reply to your answers, no matter using different State backends or
> regularly cleaning up the States (which is exactly what I am doing), it
> does not concern the number of keyed operator instances.
> I would like to know:
>    - Will the number of keyed operator instances (Java objects?) grow
>    unbounded?
>    - If so, will they run out of memory? This is not actually related to
>    the memory used by the keyed Stated inside.
>    - If not, then how Flink is managing this multiple keyed operator
>    instances?
> I think this needs more knowledge about how Flink works internally to
> understand how keyed operator instances are created, maintained and
> destroyed. That’s why I would like your help understanding this.
> Many Thanks.
> Best regards/祝好,
> Chang Liu 刘畅
> On 24 Jul 2018, at 14:31, Till Rohrmann <trohrm...@apache.org> wrote:
> Hi Chang Liu,
> if you are dealing with an unlimited number of keys and keep state around
> for every key, then your state size will keep growing with the number of
> keys. If you are using the FileStateBackend which keeps state in memory,
> you will eventually run into an OutOfMemoryException. One way to
> solve/mitigate this problem is to use the RocksDBStateBackend which can go
> out of core.
> Alternatively, you would need to clean up your state before you run out of
> memory. One way to do this is to register for every key a timer which
> clears the state. But this only works if you don't amass too much state
> data before the timer is triggered. If you wish this solution is some kind
> of a poor man's state TTL. The Flink community is currently developing a
> proper implementation of it which does not rely on additional timers (which
> increases the state footprint) [1].
> [1] https://issues.apache.org/jira/browse/FLINK-9510
> Cheers,
> Till
> On Tue, Jul 24, 2018 at 10:11 AM Chang Liu <fluency...@gmail.com> wrote:
> Dear All,
> I have questions regarding the keys. In general, the questions are:
>    - what happens if I am doing keyBy based on unlimited number of keys?
>    How Flink is managing each KeyedStream under the hood? Will I get memory
>    overflow, for example, if every KeyStream associated with a specific key is
>    taking certain amount of memory?
>    - BTW, I think it is fare to say that, I have to clear my KeyedState
>    so that the memory used by these State are cleaned up regularly. But still,
>    I am wondering, even though I am regularly cleaning up State memory, what
>    happened to memory used by the KeyedStream itself, if there is? And will
>    they be exploding?
> Let me give an example for understanding it clearly.  Let’s say we have a
> val requestStream: DataStream[HttpRequest]
> which is a stream of HTTP requests. And by using the session ID as the
> key, we can obtain a KeyedStream per single session, as following:
>         val streamPerSession: KeyedStream[HttpRequest] =
> requestStream.keyBy(_.sessionId)
> However, the session IDs are actually a hashcode generated randomly by the
> Web service/application, so that means, the number of sessions are
> unlimited (which is reasonable, because every time a user open the
> application or login, he/she will get a new unique session).
> Then, the question is: will Flink eventually run out of memory because the
> number of sessions are unlimited (and because we are keying by the session
> ID)?
>    - If so, how can we properly manage this situation?
>    - If not, could you help me understand WHY?
>    - Let’s also assume that, we are regularly clearing the KeyedState, so
>    the memory used by the State will not explode.
> Many Thanks and Looking forward to your reply :)
> Best regards/祝好,
> Chang Liu 刘畅

Reply via email to