Re: Questions on Unbounded number of keys

2018-07-31 Thread ashish pok
Thanks Till, I will try to create an instance of app will smaller heap and get 
a couple of dumps as well. I should be ok to share that on google drive. 


- Ashish

On Tuesday, July 31, 2018, 7:49 AM, Till Rohrmann  wrote:

Hi Ashish,
FIRE_AND_PURGE should also clear the window state. Yes I mean with active 
windows, windows which have not been purged yet.
Maybe Aljoscha knows more about why the window state is growing (I would not 
rule out a bug).
Cheers,Till
On Tue, Jul 31, 2018 at 1:45 PM ashish pok  wrote:

Hi Till,
Keys are unbounded (a group of events have same key but that key doesnt repeat 
after it is fired other than some odd delayed events). So basically there 1 key 
that will be aligned to a window. When you say key space of active windows, 
does that include keys for windows that have already fired and could be in 
memory footprint? If so, that is basically the problem I would get into and 
looking for a solution to clean-up. Like I said earlier overriding tigger to 
FIRE_AND_PURGE did not help. If I take the same stream and key and refactor it 
to how Chang is doing it with Process Function, issue goes away.
If you mean only currently processing key space of active windows (not the ones 
that have already fired)  then I would say, that cannot be the case. We are 
getting the data from period poll of same number of devices and uniqueness of 
key is simply a time identifier prefixed to device identifier. Even though 
there could be a little delayed data, the chances of number of unique keys 
growing constantly for days is probably none as device list is constant.
Thanks, Ashish


- Ashish

On Tuesday, July 31, 2018, 4:05 AM, Till Rohrmann  wrote:

Hi Ashish,
the processing time session windows need to store state in the StateBackends 
and I assume that your key space of active windows is constantly growing. That 
could explain why you are seeing an ever increasing memory footprint. But 
without knowing the input stream and what the UDFs do this is only a guess.
Cheers,Till
On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske  wrote:

Hi Chang,
The state handle objects are not created per key but just once per function 
instance.
Instead they route state accesses to the backend (JVM heap or RocksDB) for the 
currently active key.
Best, Fabian

2018-07-30 12:19 GMT+02:00 Chang Liu :

Hi Andrey,
Thanks for your reply. My question might be silly, but there is still one part 
I would like to fully understand. For example, in the following example:
class MyFunction extends KeyedProcessFunction[String, Click, Click] { // keyed 
by Session ID
  lazy val userId: ValueState[String] = getRuntimeContext.getState(
new ValueStateDescriptor[String]("userId", BasicTypeInfo.STRING_TYPE_INFO))

  lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))

  override def processElement(
  click: Click,
  context: KeyedProcessFunction[String, Click, Click]#Context,
  out: Collector[Click])
  : Unit = {
// process, output, clear state if necessary
  }

  override def onTimer(
  timestamp: Long,
  ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
  out: Collector[Click])
  : Unit = {
// output and clear state
  }
}
Even though I am regularly clearing the two states, userId and clicks (which 
means I am cleaning up the values stored in the States), my question is: then 
what about the two State objects themselves: userId and clicks?  These States 
objects are also created per Session ID right? If the number of Session IDs are 
unbounded, than the number of these State objects are also unbounded.
That means, there are userId-state-1 and clicks-state-1 for session-id-1, 
userId-state-2 and clicks-state-2 for session-id-2, userId-state-3 and 
clicks-state-3 for session-id-3, …, which are handled by different (or same if 
two from different range, as you call it, are assigned to the same one) keyed 
operator instance.
I am not concerning the actual value in the State (which will be managed 
carefully, if I am clearing them carefully). I am thinking about the State 
objects themselves, which I have no idea what is happening to them and what 
will happen to them.
Many thanks :)
Best regards/祝好,

Chang Liu 刘畅



On 26 Jul 2018, at 10:55, Andrey Zagrebin  wrote:
Hi Chang Liu,
The unbounded nature of the stream keyed or not should not lead to out of 
memory. 
Flink parallel keyed operator instances have fixed number (parallelism) and 
just process some range of keyed elements, in your example it is a subrange of 
session ids. 
The keyed processed elements (http requests) are objects created when they 
enter the pipeline and garage collected after having been processed in 
streaming fashion. 
If they arrive very rapidly it can lead to high back pressure from upstream to 
downstream operators, buffers can become full and pipeline stops/slows down 
processing external inputs, it usually means that 

Re: Questions on Unbounded number of keys

2018-07-31 Thread Till Rohrmann
Hi Ashish,

FIRE_AND_PURGE should also clear the window state. Yes I mean with active
windows, windows which have not been purged yet.

Maybe Aljoscha knows more about why the window state is growing (I would
not rule out a bug).

Cheers,
Till

On Tue, Jul 31, 2018 at 1:45 PM ashish pok  wrote:

> Hi Till,
>
> Keys are unbounded (a group of events have same key but that key doesnt
> repeat after it is fired other than some odd delayed events). So basically
> there 1 key that will be aligned to a window. When you say key space of
> active windows, does that include keys for windows that have already fired
> and could be in memory footprint? If so, that is basically the problem I
> would get into and looking for a solution to clean-up. Like I said earlier
> overriding tigger to FIRE_AND_PURGE did not help. If I take the same stream
> and key and refactor it to how Chang is doing it with Process Function,
> issue goes away.
>
> If you mean only currently processing key space of active windows (not the
> ones that have already fired)  then I would say, that cannot be the case.
> We are getting the data from period poll of same number of devices and
> uniqueness of key is simply a time identifier prefixed to device
> identifier. Even though there could be a little delayed data, the chances
> of number of unique keys growing constantly for days is probably none as
> device list is constant.
>
> Thanks, Ashish
>
>
> - Ashish
>
> On Tuesday, July 31, 2018, 4:05 AM, Till Rohrmann 
> wrote:
>
> Hi Ashish,
>
> the processing time session windows need to store state in the
> StateBackends and I assume that your key space of active windows is
> constantly growing. That could explain why you are seeing an ever
> increasing memory footprint. But without knowing the input stream and what
> the UDFs do this is only a guess.
>
> Cheers,
> Till
>
> On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske  wrote:
>
> Hi Chang,
>
> The state handle objects are not created per key but just once per
> function instance.
> Instead they route state accesses to the backend (JVM heap or RocksDB) for
> the currently active key.
>
> Best, Fabian
>
> 2018-07-30 12:19 GMT+02:00 Chang Liu :
>
> Hi Andrey,
>
> Thanks for your reply. My question might be silly, but there is still one
> part I would like to fully understand. For example, in the following
> example:
>
> class MyFunction extends KeyedProcessFunction[String, Click, Click] { // 
> keyed by Session ID
>   lazy val userId: ValueState[String] = getRuntimeContext.getState(
> new ValueStateDescriptor[String]("userId", 
> BasicTypeInfo.STRING_TYPE_INFO))
>
>   lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
> new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))
>
>   override def processElement(
>   click: Click,
>   context: KeyedProcessFunction[String, Click, Click]#Context,
>   out: Collector[Click])
>   : Unit = {
> // process, output, clear state if necessary
>   }
>
>   override def onTimer(
>   timestamp: Long,
>   ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
>   out: Collector[Click])
>   : Unit = {
> // output and clear state
>   }
> }
>
>
> Even though I am regularly clearing the two states, userId and clicks
> (which means I am cleaning up the values stored in the States), my question
> is: then what about the two State objects themselves: userId and clicks?
> These States objects are also created per Session ID right? If the number
> of Session IDs are unbounded, than the number of these State objects are
> also unbounded.
>
> That means, there are *userId-state-1 and clicks-state-1 for session-id-1*,
> *userId-state-2 and clicks-state-2 for session-id-2*, *userId-state-3 and
> clicks-state-3 for session-id-3*, …, which are handled by different (or
> same if two from different *range*, as you call it, are assigned to the
> same one) keyed operator instance.
>
> I am not concerning the actual value in the State (which will be managed
> carefully, if I am clearing them carefully). I am thinking about the State
> objects themselves, which I have no idea what is happening to them and what
> will happen to them.
>
> Many thanks :)
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
> On 26 Jul 2018, at 10:55, Andrey Zagrebin 
> wrote:
>
> Hi Chang Liu,
>
> The unbounded nature of the stream keyed or not should not lead to out of
> memory.
>
> Flink parallel keyed operator instances have fixed number (parallelism)
> and just process some range of keyed elements, in your example it is a
> subrange of session ids.
>
> The keyed processed elements (http requests) are objects created when they
> enter the pipeline and garage collected after having been processed in
> streaming fashion.
>
> If they arrive very rapidly it can lead to high back pressure from
> upstream to downstream operators, buffers can become full and pipeline
> stops/slows down processing external inputs, it usually means that your
> 

Re: Questions on Unbounded number of keys

2018-07-31 Thread ashish pok
Hi Till,
Keys are unbounded (a group of events have same key but that key doesnt repeat 
after it is fired other than some odd delayed events). So basically there 1 key 
that will be aligned to a window. When you say key space of active windows, 
does that include keys for windows that have already fired and could be in 
memory footprint? If so, that is basically the problem I would get into and 
looking for a solution to clean-up. Like I said earlier overriding tigger to 
FIRE_AND_PURGE did not help. If I take the same stream and key and refactor it 
to how Chang is doing it with Process Function, issue goes away.
If you mean only currently processing key space of active windows (not the ones 
that have already fired)  then I would say, that cannot be the case. We are 
getting the data from period poll of same number of devices and uniqueness of 
key is simply a time identifier prefixed to device identifier. Even though 
there could be a little delayed data, the chances of number of unique keys 
growing constantly for days is probably none as device list is constant.
Thanks, Ashish


- Ashish

On Tuesday, July 31, 2018, 4:05 AM, Till Rohrmann  wrote:

Hi Ashish,
the processing time session windows need to store state in the StateBackends 
and I assume that your key space of active windows is constantly growing. That 
could explain why you are seeing an ever increasing memory footprint. But 
without knowing the input stream and what the UDFs do this is only a guess.
Cheers,Till
On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske  wrote:

Hi Chang,
The state handle objects are not created per key but just once per function 
instance.
Instead they route state accesses to the backend (JVM heap or RocksDB) for the 
currently active key.
Best, Fabian

2018-07-30 12:19 GMT+02:00 Chang Liu :

Hi Andrey,
Thanks for your reply. My question might be silly, but there is still one part 
I would like to fully understand. For example, in the following example:
class MyFunction extends KeyedProcessFunction[String, Click, Click] { // keyed 
by Session ID
  lazy val userId: ValueState[String] = getRuntimeContext.getState(
new ValueStateDescriptor[String]("userId", BasicTypeInfo.STRING_TYPE_INFO))

  lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))

  override def processElement(
  click: Click,
  context: KeyedProcessFunction[String, Click, Click]#Context,
  out: Collector[Click])
  : Unit = {
// process, output, clear state if necessary
  }

  override def onTimer(
  timestamp: Long,
  ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
  out: Collector[Click])
  : Unit = {
// output and clear state
  }
}
Even though I am regularly clearing the two states, userId and clicks (which 
means I am cleaning up the values stored in the States), my question is: then 
what about the two State objects themselves: userId and clicks?  These States 
objects are also created per Session ID right? If the number of Session IDs are 
unbounded, than the number of these State objects are also unbounded.
That means, there are userId-state-1 and clicks-state-1 for session-id-1, 
userId-state-2 and clicks-state-2 for session-id-2, userId-state-3 and 
clicks-state-3 for session-id-3, …, which are handled by different (or same if 
two from different range, as you call it, are assigned to the same one) keyed 
operator instance.
I am not concerning the actual value in the State (which will be managed 
carefully, if I am clearing them carefully). I am thinking about the State 
objects themselves, which I have no idea what is happening to them and what 
will happen to them.
Many thanks :)
Best regards/祝好,

Chang Liu 刘畅



On 26 Jul 2018, at 10:55, Andrey Zagrebin  wrote:
Hi Chang Liu,
The unbounded nature of the stream keyed or not should not lead to out of 
memory. 
Flink parallel keyed operator instances have fixed number (parallelism) and 
just process some range of keyed elements, in your example it is a subrange of 
session ids. 
The keyed processed elements (http requests) are objects created when they 
enter the pipeline and garage collected after having been processed in 
streaming fashion. 
If they arrive very rapidly it can lead to high back pressure from upstream to 
downstream operators, buffers can become full and pipeline stops/slows down 
processing external inputs, it usually means that your pipeline is under 
provisioned. 
The only accumulated data comes from state (windows, user state etc), so if you 
control its memory consumption, as Till described, there should be no other 
source of out of memory.
Cheers,Andrey

On 25 Jul 2018, at 19:06, Chang Liu  wrote:
Hi Till,
Thanks for your reply. But I think maybe I did not make my question clear. My 
question is not about whether the States within each keyed operator instances 
will run out of memory. My question is about, whether the unlimited keyed 

Re: Questions on Unbounded number of keys

2018-07-31 Thread Till Rohrmann
Hi Ashish,

the processing time session windows need to store state in the
StateBackends and I assume that your key space of active windows is
constantly growing. That could explain why you are seeing an ever
increasing memory footprint. But without knowing the input stream and what
the UDFs do this is only a guess.

Cheers,
Till

On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske  wrote:

> Hi Chang,
>
> The state handle objects are not created per key but just once per
> function instance.
> Instead they route state accesses to the backend (JVM heap or RocksDB) for
> the currently active key.
>
> Best, Fabian
>
> 2018-07-30 12:19 GMT+02:00 Chang Liu :
>
>> Hi Andrey,
>>
>> Thanks for your reply. My question might be silly, but there is still one
>> part I would like to fully understand. For example, in the following
>> example:
>>
>> class MyFunction extends KeyedProcessFunction[String, Click, Click] { // 
>> keyed by Session ID
>>   lazy val userId: ValueState[String] = getRuntimeContext.getState(
>> new ValueStateDescriptor[String]("userId", 
>> BasicTypeInfo.STRING_TYPE_INFO))
>>
>>   lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
>> new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))
>>
>>   override def processElement(
>>   click: Click,
>>   context: KeyedProcessFunction[String, Click, Click]#Context,
>>   out: Collector[Click])
>>   : Unit = {
>> // process, output, clear state if necessary
>>   }
>>
>>   override def onTimer(
>>   timestamp: Long,
>>   ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
>>   out: Collector[Click])
>>   : Unit = {
>> // output and clear state
>>   }
>> }
>>
>>
>> Even though I am regularly clearing the two states, userId and clicks
>> (which means I am cleaning up the values stored in the States), my question
>> is: then what about the two State objects themselves: userId and clicks?
>> These States objects are also created per Session ID right? If the number
>> of Session IDs are unbounded, than the number of these State objects are
>> also unbounded.
>>
>> That means, there are *userId-state-1 and clicks-state-1 for
>> session-id-1*, *userId-state-2 and clicks-state-2 for session-id-2*, 
>> *userId-state-3
>> and clicks-state-3 for session-id-3*, …, which are handled by different
>> (or same if two from different *range*, as you call it, are assigned to
>> the same one) keyed operator instance.
>>
>> I am not concerning the actual value in the State (which will be managed
>> carefully, if I am clearing them carefully). I am thinking about the State
>> objects themselves, which I have no idea what is happening to them and what
>> will happen to them.
>>
>> Many thanks :)
>>
>> Best regards/祝好,
>>
>> Chang Liu 刘畅
>>
>>
>> On 26 Jul 2018, at 10:55, Andrey Zagrebin 
>> wrote:
>>
>> Hi Chang Liu,
>>
>> The unbounded nature of the stream keyed or not should not lead to out of
>> memory.
>>
>> Flink parallel keyed operator instances have fixed number (parallelism)
>> and just process some range of keyed elements, in your example it is a
>> subrange of session ids.
>>
>> The keyed processed elements (http requests) are objects created when
>> they enter the pipeline and garage collected after having been processed in
>> streaming fashion.
>>
>> If they arrive very rapidly it can lead to high back pressure from
>> upstream to downstream operators, buffers can become full and pipeline
>> stops/slows down processing external inputs, it usually means that your
>> pipeline is under provisioned.
>>
>> The only accumulated data comes from state (windows, user state etc), so
>> if you control its memory consumption, as Till described, there should be
>> no other source of out of memory.
>>
>> Cheers,
>> Andrey
>>
>> On 25 Jul 2018, at 19:06, Chang Liu  wrote:
>>
>> Hi Till,
>>
>> Thanks for your reply. But I think maybe I did not make my question
>> clear. My question is not about whether the States within each keyed
>> operator instances will run out of memory. My question is about, whether
>> the unlimited keyed operator instances themselves will run out of memory.
>>
>> So to reply to your answers, no matter using different State backends or
>> regularly cleaning up the States (which is exactly what I am doing), it
>> does not concern the number of keyed operator instances.
>>
>> I would like to know:
>>
>>- Will the number of keyed operator instances (Java objects?) grow
>>unbounded?
>>- If so, will they run out of memory? This is not actually related to
>>the memory used by the keyed Stated inside.
>>- If not, then how Flink is managing this multiple keyed operator
>>instances?
>>
>>
>> I think this needs more knowledge about how Flink works internally to
>> understand how keyed operator instances are created, maintained and
>> destroyed. That’s why I would like your help understanding this.
>>
>> Many Thanks.
>>
>> Best regards/祝好,
>>
>> Chang Liu 刘畅
>>
>>
>> 

Re: Questions on Unbounded number of keys

2018-07-30 Thread Fabian Hueske
Hi Chang,

The state handle objects are not created per key but just once per function
instance.
Instead they route state accesses to the backend (JVM heap or RocksDB) for
the currently active key.

Best, Fabian

2018-07-30 12:19 GMT+02:00 Chang Liu :

> Hi Andrey,
>
> Thanks for your reply. My question might be silly, but there is still one
> part I would like to fully understand. For example, in the following
> example:
>
> class MyFunction extends KeyedProcessFunction[String, Click, Click] { // 
> keyed by Session ID
>   lazy val userId: ValueState[String] = getRuntimeContext.getState(
> new ValueStateDescriptor[String]("userId", 
> BasicTypeInfo.STRING_TYPE_INFO))
>
>   lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
> new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))
>
>   override def processElement(
>   click: Click,
>   context: KeyedProcessFunction[String, Click, Click]#Context,
>   out: Collector[Click])
>   : Unit = {
> // process, output, clear state if necessary
>   }
>
>   override def onTimer(
>   timestamp: Long,
>   ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
>   out: Collector[Click])
>   : Unit = {
> // output and clear state
>   }
> }
>
>
> Even though I am regularly clearing the two states, userId and clicks
> (which means I am cleaning up the values stored in the States), my question
> is: then what about the two State objects themselves: userId and clicks?
> These States objects are also created per Session ID right? If the number
> of Session IDs are unbounded, than the number of these State objects are
> also unbounded.
>
> That means, there are *userId-state-1 and clicks-state-1 for session-id-1*,
> *userId-state-2 and clicks-state-2 for session-id-2*, *userId-state-3 and
> clicks-state-3 for session-id-3*, …, which are handled by different (or
> same if two from different *range*, as you call it, are assigned to the
> same one) keyed operator instance.
>
> I am not concerning the actual value in the State (which will be managed
> carefully, if I am clearing them carefully). I am thinking about the State
> objects themselves, which I have no idea what is happening to them and what
> will happen to them.
>
> Many thanks :)
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
> On 26 Jul 2018, at 10:55, Andrey Zagrebin 
> wrote:
>
> Hi Chang Liu,
>
> The unbounded nature of the stream keyed or not should not lead to out of
> memory.
>
> Flink parallel keyed operator instances have fixed number (parallelism)
> and just process some range of keyed elements, in your example it is a
> subrange of session ids.
>
> The keyed processed elements (http requests) are objects created when they
> enter the pipeline and garage collected after having been processed in
> streaming fashion.
>
> If they arrive very rapidly it can lead to high back pressure from
> upstream to downstream operators, buffers can become full and pipeline
> stops/slows down processing external inputs, it usually means that your
> pipeline is under provisioned.
>
> The only accumulated data comes from state (windows, user state etc), so
> if you control its memory consumption, as Till described, there should be
> no other source of out of memory.
>
> Cheers,
> Andrey
>
> On 25 Jul 2018, at 19:06, Chang Liu  wrote:
>
> Hi Till,
>
> Thanks for your reply. But I think maybe I did not make my question clear.
> My question is not about whether the States within each keyed operator
> instances will run out of memory. My question is about, whether the
> unlimited keyed operator instances themselves will run out of memory.
>
> So to reply to your answers, no matter using different State backends or
> regularly cleaning up the States (which is exactly what I am doing), it
> does not concern the number of keyed operator instances.
>
> I would like to know:
>
>- Will the number of keyed operator instances (Java objects?) grow
>unbounded?
>- If so, will they run out of memory? This is not actually related to
>the memory used by the keyed Stated inside.
>- If not, then how Flink is managing this multiple keyed operator
>instances?
>
>
> I think this needs more knowledge about how Flink works internally to
> understand how keyed operator instances are created, maintained and
> destroyed. That’s why I would like your help understanding this.
>
> Many Thanks.
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
> On 24 Jul 2018, at 14:31, Till Rohrmann  wrote:
>
> Hi Chang Liu,
>
> if you are dealing with an unlimited number of keys and keep state around
> for every key, then your state size will keep growing with the number of
> keys. If you are using the FileStateBackend which keeps state in memory,
> you will eventually run into an OutOfMemoryException. One way to
> solve/mitigate this problem is to use the RocksDBStateBackend which can go
> out of core.
>
> Alternatively, you would need to clean up your state before you run out of

Re: Questions on Unbounded number of keys

2018-07-30 Thread Chang Liu
Hi Andrey,

Thanks for your reply. My question might be silly, but there is still one part 
I would like to fully understand. For example, in the following example:

class MyFunction extends KeyedProcessFunction[String, Click, Click] { // keyed 
by Session ID
  lazy val userId: ValueState[String] = getRuntimeContext.getState(
new ValueStateDescriptor[String]("userId", BasicTypeInfo.STRING_TYPE_INFO))

  lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))

  override def processElement(
  click: Click,
  context: KeyedProcessFunction[String, Click, Click]#Context,
  out: Collector[Click])
  : Unit = {
// process, output, clear state if necessary
  }

  override def onTimer(
  timestamp: Long,
  ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
  out: Collector[Click])
  : Unit = {
// output and clear state
  }
}

Even though I am regularly clearing the two states, userId and clicks (which 
means I am cleaning up the values stored in the States), my question is: then 
what about the two State objects themselves: userId and clicks?  These States 
objects are also created per Session ID right? If the number of Session IDs are 
unbounded, than the number of these State objects are also unbounded.

That means, there are userId-state-1 and clicks-state-1 for session-id-1, 
userId-state-2 and clicks-state-2 for session-id-2, userId-state-3 and 
clicks-state-3 for session-id-3, …, which are handled by different (or same if 
two from different range, as you call it, are assigned to the same one) keyed 
operator instance.

I am not concerning the actual value in the State (which will be managed 
carefully, if I am clearing them carefully). I am thinking about the State 
objects themselves, which I have no idea what is happening to them and what 
will happen to them.

Many thanks :)

Best regards/祝好,

Chang Liu 刘畅


> On 26 Jul 2018, at 10:55, Andrey Zagrebin  wrote:
> 
> Hi Chang Liu,
> 
> The unbounded nature of the stream keyed or not should not lead to out of 
> memory. 
> 
> Flink parallel keyed operator instances have fixed number (parallelism) and 
> just process some range of keyed elements, in your example it is a subrange 
> of session ids. 
> 
> The keyed processed elements (http requests) are objects created when they 
> enter the pipeline and garage collected after having been processed in 
> streaming fashion. 
> 
> If they arrive very rapidly it can lead to high back pressure from upstream 
> to downstream operators, buffers can become full and pipeline stops/slows 
> down processing external inputs, it usually means that your pipeline is under 
> provisioned. 
> 
> The only accumulated data comes from state (windows, user state etc), so if 
> you control its memory consumption, as Till described, there should be no 
> other source of out of memory.
> 
> Cheers,
> Andrey
> 
>> On 25 Jul 2018, at 19:06, Chang Liu > > wrote:
>> 
>> Hi Till,
>> 
>> Thanks for your reply. But I think maybe I did not make my question clear. 
>> My question is not about whether the States within each keyed operator 
>> instances will run out of memory. My question is about, whether the 
>> unlimited keyed operator instances themselves will run out of memory.
>> 
>> So to reply to your answers, no matter using different State backends or 
>> regularly cleaning up the States (which is exactly what I am doing), it does 
>> not concern the number of keyed operator instances.
>> 
>> I would like to know:
>> Will the number of keyed operator instances (Java objects?) grow unbounded? 
>> If so, will they run out of memory? This is not actually related to the 
>> memory used by the keyed Stated inside.
>> If not, then how Flink is managing this multiple keyed operator instances?
>> 
>> I think this needs more knowledge about how Flink works internally to 
>> understand how keyed operator instances are created, maintained and 
>> destroyed. That’s why I would like your help understanding this.
>> 
>> Many Thanks.
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
>> 
>>> On 24 Jul 2018, at 14:31, Till Rohrmann >> > wrote:
>>> 
>>> Hi Chang Liu,
>>> 
>>> if you are dealing with an unlimited number of keys and keep state around 
>>> for every key, then your state size will keep growing with the number of 
>>> keys. If you are using the FileStateBackend which keeps state in memory, 
>>> you will eventually run into an OutOfMemoryException. One way to 
>>> solve/mitigate this problem is to use the RocksDBStateBackend which can go 
>>> out of core.
>>> 
>>> Alternatively, you would need to clean up your state before you run out of 
>>> memory. One way to do this is to register for every key a timer which 
>>> clears the state. But this only works if you don't amass too much state 
>>> data before the timer is triggered. If you wish this solution 

Re: Questions on Unbounded number of keys

2018-07-28 Thread Ashish Pokharel
Andrey, Till,

This doesn’t jive with what I have noticed (fully acknowledge that I am still 
getting hang of the framework). I sent a couple of notes on this in earlier 
threads. 

With this very simple processing, I am running into slow creep up of memory 
with unbounded keys, which eventually ends up with OOM. 

DataStream processedData = rawTuples
.keyBy(PlatformTuple::getKey)

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(120
 
.trigger(new ProcessingTimePurgeTrigger())
.apply(new MetricWindowFn())
.name("windowFunctionTuple")
.map(new TupleToEventMapFn())
.name("mapTupleEvent")
;


I initially didnt even have ProcessingTmePurgeTrigger and it was using default 
Trigger. In an effort to fix this issue, I created my own Trigger from default 
ProcessingTimeTrigger with simple override to onProcessingTime method 
(essentially replacing FIRE with FIRE_AND_PURGE)

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, 
TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}

Of course, I can switch to RocksDB backend but it feels like we are simply 
pushing problem down to storage now. We may end up using RocksDB but wanted to 
make sure our clean-ups are working properly before doing so. I attempted heap 
dump as well and from what I could see some Key related objects seems to be 
hanging around (perhaps watermarks etc???). Heap was over 8GB and hence time 
consuming to perform more introspection. I have been meaning to get back to 
creating same app with smaller footprint to scan heap dump better but haven’t 
had time (esp because below works fine and it is not burning issue, however 
reverting to low level API is not my preference as we are bypassing nice high 
level APIs)

If I replace window function with Process function and use timeService to 
implement the same logic and clear state onTimer method, this issue goes away. 
@Till’s comment seems to indicate this as a possible solution, is that correct? 
If so, what would be the best way to get this done in above snippet? I would 
have thought FIRE_AND_PURGE would have done the same thing.

Appreciate your pointers on this.

Thanks, Ashish


> On Jul 26, 2018, at 4:55 AM, Andrey Zagrebin  wrote:
> 
> Hi Chang Liu,
> 
> The unbounded nature of the stream keyed or not should not lead to out of 
> memory. 
> 
> Flink parallel keyed operator instances have fixed number (parallelism) and 
> just process some range of keyed elements, in your example it is a subrange 
> of session ids. 
> 
> The keyed processed elements (http requests) are objects created when they 
> enter the pipeline and garage collected after having been processed in 
> streaming fashion. 
> 
> If they arrive very rapidly it can lead to high back pressure from upstream 
> to downstream operators, buffers can become full and pipeline stops/slows 
> down processing external inputs, it usually means that your pipeline is under 
> provisioned. 
> 
> The only accumulated data comes from state (windows, user state etc), so if 
> you control its memory consumption, as Till described, there should be no 
> other source of out of memory.
> 
> Cheers,
> Andrey
> 
>> On 25 Jul 2018, at 19:06, Chang Liu > > wrote:
>> 
>> Hi Till,
>> 
>> Thanks for your reply. But I think maybe I did not make my question clear. 
>> My question is not about whether the States within each keyed operator 
>> instances will run out of memory. My question is about, whether the 
>> unlimited keyed operator instances themselves will run out of memory.
>> 
>> So to reply to your answers, no matter using different State backends or 
>> regularly cleaning up the States (which is exactly what I am doing), it does 
>> not concern the number of keyed operator instances.
>> 
>> I would like to know:
>> Will the number of keyed operator instances (Java objects?) grow unbounded? 
>> If so, will they run out of memory? This is not actually related to the 
>> memory used by the keyed Stated inside.
>> If not, then how Flink is managing this multiple keyed operator instances?
>> 
>> I think this needs more knowledge about how Flink works internally to 
>> understand how keyed operator instances are created, maintained and 
>> destroyed. That’s why I would like your help understanding this.
>> 
>> Many Thanks.
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
>> 
>>> On 24 Jul 2018, at 14:31, Till Rohrmann >> > wrote:
>>> 
>>> Hi Chang Liu,
>>> 
>>> if you are dealing with an unlimited number of keys and keep state around 
>>> for every key, then your state size will keep growing with the number of 

Re: Questions on Unbounded number of keys

2018-07-26 Thread Andrey Zagrebin
Hi Chang Liu,

The unbounded nature of the stream keyed or not should not lead to out of 
memory. 

Flink parallel keyed operator instances have fixed number (parallelism) and 
just process some range of keyed elements, in your example it is a subrange of 
session ids. 

The keyed processed elements (http requests) are objects created when they 
enter the pipeline and garage collected after having been processed in 
streaming fashion. 

If they arrive very rapidly it can lead to high back pressure from upstream to 
downstream operators, buffers can become full and pipeline stops/slows down 
processing external inputs, it usually means that your pipeline is under 
provisioned. 

The only accumulated data comes from state (windows, user state etc), so if you 
control its memory consumption, as Till described, there should be no other 
source of out of memory.

Cheers,
Andrey

> On 25 Jul 2018, at 19:06, Chang Liu  wrote:
> 
> Hi Till,
> 
> Thanks for your reply. But I think maybe I did not make my question clear. My 
> question is not about whether the States within each keyed operator instances 
> will run out of memory. My question is about, whether the unlimited keyed 
> operator instances themselves will run out of memory.
> 
> So to reply to your answers, no matter using different State backends or 
> regularly cleaning up the States (which is exactly what I am doing), it does 
> not concern the number of keyed operator instances.
> 
> I would like to know:
> Will the number of keyed operator instances (Java objects?) grow unbounded? 
> If so, will they run out of memory? This is not actually related to the 
> memory used by the keyed Stated inside.
> If not, then how Flink is managing this multiple keyed operator instances?
> 
> I think this needs more knowledge about how Flink works internally to 
> understand how keyed operator instances are created, maintained and 
> destroyed. That’s why I would like your help understanding this.
> 
> Many Thanks.
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 
>> On 24 Jul 2018, at 14:31, Till Rohrmann > > wrote:
>> 
>> Hi Chang Liu,
>> 
>> if you are dealing with an unlimited number of keys and keep state around 
>> for every key, then your state size will keep growing with the number of 
>> keys. If you are using the FileStateBackend which keeps state in memory, you 
>> will eventually run into an OutOfMemoryException. One way to solve/mitigate 
>> this problem is to use the RocksDBStateBackend which can go out of core.
>> 
>> Alternatively, you would need to clean up your state before you run out of 
>> memory. One way to do this is to register for every key a timer which clears 
>> the state. But this only works if you don't amass too much state data before 
>> the timer is triggered. If you wish this solution is some kind of a poor 
>> man's state TTL. The Flink community is currently developing a proper 
>> implementation of it which does not rely on additional timers (which 
>> increases the state footprint) [1].
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-9510 
>> 
>> 
>> Cheers,
>> Till
>> 
>> On Tue, Jul 24, 2018 at 10:11 AM Chang Liu > > wrote:
>> Dear All,
>> 
>> I have questions regarding the keys. In general, the questions are:
>> what happens if I am doing keyBy based on unlimited number of keys? How 
>> Flink is managing each KeyedStream under the hood? Will I get memory 
>> overflow, for example, if every KeyStream associated with a specific key is 
>> taking certain amount of memory?
>> BTW, I think it is fare to say that, I have to clear my KeyedState so that 
>> the memory used by these State are cleaned up regularly. But still, I am 
>> wondering, even though I am regularly cleaning up State memory, what 
>> happened to memory used by the KeyedStream itself, if there is? And will 
>> they be exploding?
>> 
>> Let me give an example for understanding it clearly.  Let’s say we have a
>> 
>>  val requestStream: DataStream[HttpRequest]
>> 
>> which is a stream of HTTP requests. And by using the session ID as the key, 
>> we can obtain a KeyedStream per single session, as following:
>> 
>> val streamPerSession: KeyedStream[HttpRequest] = 
>> requestStream.keyBy(_.sessionId)
>> 
>> However, the session IDs are actually a hashcode generated randomly by the 
>> Web service/application, so that means, the number of sessions are unlimited 
>> (which is reasonable, because every time a user open the application or 
>> login, he/she will get a new unique session). 
>> 
>> Then, the question is: will Flink eventually run out of memory because the 
>> number of sessions are unlimited (and because we are keying by the session 
>> ID)?
>> If so, how can we properly manage this situation?
>> If not, could you help me understand WHY?
>> Let’s also assume that, we are regularly clearing the KeyedState, so the 
>> memory used by the 

Re: Questions on Unbounded number of keys

2018-07-25 Thread Chang Liu
Hi Till,

Thanks for your reply. But I think maybe I did not make my question clear. My 
question is not about whether the States within each keyed operator instances 
will run out of memory. My question is about, whether the unlimited keyed 
operator instances themselves will run out of memory.

So to reply to your answers, no matter using different State backends or 
regularly cleaning up the States (which is exactly what I am doing), it does 
not concern the number of keyed operator instances.

I would like to know:
Will the number of keyed operator instances (Java objects?) grow unbounded? 
If so, will they run out of memory? This is not actually related to the memory 
used by the keyed Stated inside.
If not, then how Flink is managing this multiple keyed operator instances?

I think this needs more knowledge about how Flink works internally to 
understand how keyed operator instances are created, maintained and destroyed. 
That’s why I would like your help understanding this.

Many Thanks.

Best regards/祝好,

Chang Liu 刘畅


> On 24 Jul 2018, at 14:31, Till Rohrmann  wrote:
> 
> Hi Chang Liu,
> 
> if you are dealing with an unlimited number of keys and keep state around for 
> every key, then your state size will keep growing with the number of keys. If 
> you are using the FileStateBackend which keeps state in memory, you will 
> eventually run into an OutOfMemoryException. One way to solve/mitigate this 
> problem is to use the RocksDBStateBackend which can go out of core.
> 
> Alternatively, you would need to clean up your state before you run out of 
> memory. One way to do this is to register for every key a timer which clears 
> the state. But this only works if you don't amass too much state data before 
> the timer is triggered. If you wish this solution is some kind of a poor 
> man's state TTL. The Flink community is currently developing a proper 
> implementation of it which does not rely on additional timers (which 
> increases the state footprint) [1].
> 
> [1] https://issues.apache.org/jira/browse/FLINK-9510 
> 
> 
> Cheers,
> Till
> 
> On Tue, Jul 24, 2018 at 10:11 AM Chang Liu  > wrote:
> Dear All,
> 
> I have questions regarding the keys. In general, the questions are:
> what happens if I am doing keyBy based on unlimited number of keys? How Flink 
> is managing each KeyedStream under the hood? Will I get memory overflow, for 
> example, if every KeyStream associated with a specific key is taking certain 
> amount of memory?
> BTW, I think it is fare to say that, I have to clear my KeyedState so that 
> the memory used by these State are cleaned up regularly. But still, I am 
> wondering, even though I am regularly cleaning up State memory, what happened 
> to memory used by the KeyedStream itself, if there is? And will they be 
> exploding?
> 
> Let me give an example for understanding it clearly.  Let’s say we have a
> 
>   val requestStream: DataStream[HttpRequest]
> 
> which is a stream of HTTP requests. And by using the session ID as the key, 
> we can obtain a KeyedStream per single session, as following:
> 
> val streamPerSession: KeyedStream[HttpRequest] = 
> requestStream.keyBy(_.sessionId)
> 
> However, the session IDs are actually a hashcode generated randomly by the 
> Web service/application, so that means, the number of sessions are unlimited 
> (which is reasonable, because every time a user open the application or 
> login, he/she will get a new unique session). 
> 
> Then, the question is: will Flink eventually run out of memory because the 
> number of sessions are unlimited (and because we are keying by the session 
> ID)?
> If so, how can we properly manage this situation?
> If not, could you help me understand WHY?
> Let’s also assume that, we are regularly clearing the KeyedState, so the 
> memory used by the State will not explode. 
> 
> 
> Many Thanks and Looking forward to your reply :)
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 



Re: Questions on Unbounded number of keys

2018-07-24 Thread Till Rohrmann
Hi Chang Liu,

if you are dealing with an unlimited number of keys and keep state around
for every key, then your state size will keep growing with the number of
keys. If you are using the FileStateBackend which keeps state in memory,
you will eventually run into an OutOfMemoryException. One way to
solve/mitigate this problem is to use the RocksDBStateBackend which can go
out of core.

Alternatively, you would need to clean up your state before you run out of
memory. One way to do this is to register for every key a timer which
clears the state. But this only works if you don't amass too much state
data before the timer is triggered. If you wish this solution is some kind
of a poor man's state TTL. The Flink community is currently developing a
proper implementation of it which does not rely on additional timers (which
increases the state footprint) [1].

[1] https://issues.apache.org/jira/browse/FLINK-9510

Cheers,
Till

On Tue, Jul 24, 2018 at 10:11 AM Chang Liu  wrote:

> Dear All,
>
> I have questions regarding the keys. In general, the questions are:
>
>- what happens if I am doing keyBy based on unlimited number of keys?
>How Flink is managing each KeyedStream under the hood? Will I get memory
>overflow, for example, if every KeyStream associated with a specific key is
>taking certain amount of memory?
>- BTW, I think it is fare to say that, I have to clear my KeyedState
>so that the memory used by these State are cleaned up regularly. But still,
>I am wondering, even though I am regularly cleaning up State memory, what
>happened to memory used by the KeyedStream itself, if there is? And will
>they be exploding?
>
>
> Let me give an example for understanding it clearly.  Let’s say we have a
>
> val requestStream: DataStream[HttpRequest]
>
> which is a stream of HTTP requests. And by using the session ID as the
> key, we can obtain a KeyedStream per single session, as following:
>
> val streamPerSession: KeyedStream[HttpRequest] =
> requestStream.keyBy(_.sessionId)
>
> However, the session IDs are actually a hashcode generated randomly by the
> Web service/application, so that means, the number of sessions are
> unlimited (which is reasonable, because every time a user open the
> application or login, he/she will get a new unique session).
>
> Then, the question is: will Flink eventually run out of memory because the
> number of sessions are unlimited (and because we are keying by the session
> ID)?
>
>- If so, how can we properly manage this situation?
>- If not, could you help me understand WHY?
>- Let’s also assume that, we are regularly clearing the KeyedState, so
>the memory used by the State will not explode.
>
>
>
> Many Thanks and Looking forward to your reply :)
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
>