Re: [External] Re: Memory Issue When Running Beam On Flink

2020-08-31 Thread David Gogokhiya
I have a very naive question. I know Jan suggested to use 2 successive fixed 
overlapping windows with offset as a temporary solution to dedup the events. 
However, I am wondering whether using a single fixed window of length let's say 
1 day followed by a deduplicate function is a good alternative? I assume that 
at the end of the window all the timers will be cleared which will result in 
missing some of the duplicates but I am ok with that.

My pipeline looks something like the following: 
https://pasteboard.co/JoWL0HP.png

It seems to be working when I tested it but I wanted to double check especially 
considering the the following statement taken from the Beam documentation 
(https://beam.apache.org/documentation/programming-guide/#windowing): "If you 
set a windowing function using the Window transform, each element is assigned 
to a window, but the windows are not considered until GroupByKey or Combine 
aggregates across a window and key. "

P.S. this is my 5th attempt to post a reply. I hope this reply will be 
posted...not sure why my prev emails didn't make it through though


On 2020/08/27 10:28:48, Jan Lukavský  wrote: 
>  > If the user chooses to create a window of 10 years, I'd say it is 
> expected behavior that the state will be kept for as long as this duration.
> 
> State will be kept, the problem is that each key in the window will 
> carry a cleanup timer, although there might be nothing to clear (there 
> is no state to be kept). This suboptimality is really related only to 
> these cases and there is nothing special about global windows in there. 
> It is only about that other large windowfns are really rare, but that is 
> a coincidence, not a cause.
> 
> Nevertheless, I'm fine with your proposed solution, we might extend it 
> in the future, if we find it useful. :)
> 
> Jan
> 
> On 8/27/20 12:06 PM, Maximilian Michels wrote:
> > If the user chooses to create a window of 10 years, I'd say it is 
> > expected behavior that the state will be kept for as long as this 
> > duration.
> >
> > GlobalWindows are different because they represent the default case 
> > where the user does not even use windowing. I think it warrants to be 
> > treated differently, especially because cleanup simply cannot be 
> > ensured by the watermark.
> >
> > It would be possible to combine both approaches, but I'd rather not 
> > skip the cleanup timer for non-global windows because that could 
> > easily become the source of another leak. The more pressing issue here 
> > is the global window, not specific windowing.
> >
> > -Max
> >
> > On 26.08.20 10:15, Jan Lukavský wrote:
> >> Window triggering is afaik operation that is specific to GBK. 
> >> Stateful DoFns can have (as shown in the case of deduplication) 
> >> timers set for the GC only, triggering has no effect there. And yes, 
> >> if we have other timers than GC (any user timers), then we have to 
> >> have GC timer (because timers are a form of state).
> >>
> >> Imagine a (admittedly artificial) example of deduplication in fixed 
> >> window of 10 years. It would exhibit exactly the same state growth as 
> >> global window (and 10 years is "almost infinite", right? :)).
> >>
> >> Jan
> >>
> >> On 8/26/20 10:01 AM, Maximilian Michels wrote:
>  The inefficiency described happens if and only if the following two 
>  conditions are met:
> 
>   a) there are many timers per single window (as otherwise they will 
>  be negligible)
> 
>   b) there are many keys which actually contain no state (as 
>  otherwise the timer would be negligible wrt the state size) 
> >>>
> >>> Each window has to have a timer set, it is unavoidable for the 
> >>> window computation to be triggered accordingly. This happens 
> >>> regardless of whether we have state associated with the key/window 
> >>> or not. The additional cleanup timer is just a side effect and not a 
> >>> concern in my opinion. Since window computation is per-key, there is 
> >>> no way around this. I don't think skipping the cleanup timer for non 
> >>> global windows without state is a good idea, just to save one 
> >>> cleanup timer, when there are already timers created for the window 
> >>> computation.
> >>>
> >>> Now, the global window is different in that respect because we can't 
> >>> assume it is going to be triggered for unbounded streams. Thus, it 
> >>> makes sense to me to handle it differently by not using triggers but 
> >>> cleaning up once a watermark > MAX_TIMESTAMP has been processed.
> >>>
> >>> -Max
> >>>
> >>> On 26.08.20 09:20, Jan Lukavský wrote:
>  On 8/25/20 9:27 PM, Maximilian Michels wrote:
> 
> >> I agree that this probably solves the described issue in the most 
> >> straightforward way, but special handling for global window feels 
> >> weird, as there is really nothing special about global window wrt 
> >> state cleanup. 
> >
> > Why is special handling for the global window weird? After all, it 
> > is 

Re: [External] Re: Memory Issue When Running Beam On Flink

2020-08-27 Thread Jan Lukavský
> If the user chooses to create a window of 10 years, I'd say it is 
expected behavior that the state will be kept for as long as this duration.


State will be kept, the problem is that each key in the window will 
carry a cleanup timer, although there might be nothing to clear (there 
is no state to be kept). This suboptimality is really related only to 
these cases and there is nothing special about global windows in there. 
It is only about that other large windowfns are really rare, but that is 
a coincidence, not a cause.


Nevertheless, I'm fine with your proposed solution, we might extend it 
in the future, if we find it useful. :)


Jan

On 8/27/20 12:06 PM, Maximilian Michels wrote:
If the user chooses to create a window of 10 years, I'd say it is 
expected behavior that the state will be kept for as long as this 
duration.


GlobalWindows are different because they represent the default case 
where the user does not even use windowing. I think it warrants to be 
treated differently, especially because cleanup simply cannot be 
ensured by the watermark.


It would be possible to combine both approaches, but I'd rather not 
skip the cleanup timer for non-global windows because that could 
easily become the source of another leak. The more pressing issue here 
is the global window, not specific windowing.


-Max

On 26.08.20 10:15, Jan Lukavský wrote:
Window triggering is afaik operation that is specific to GBK. 
Stateful DoFns can have (as shown in the case of deduplication) 
timers set for the GC only, triggering has no effect there. And yes, 
if we have other timers than GC (any user timers), then we have to 
have GC timer (because timers are a form of state).


Imagine a (admittedly artificial) example of deduplication in fixed 
window of 10 years. It would exhibit exactly the same state growth as 
global window (and 10 years is "almost infinite", right? :)).


Jan

On 8/26/20 10:01 AM, Maximilian Michels wrote:
The inefficiency described happens if and only if the following two 
conditions are met:


 a) there are many timers per single window (as otherwise they will 
be negligible)


 b) there are many keys which actually contain no state (as 
otherwise the timer would be negligible wrt the state size) 


Each window has to have a timer set, it is unavoidable for the 
window computation to be triggered accordingly. This happens 
regardless of whether we have state associated with the key/window 
or not. The additional cleanup timer is just a side effect and not a 
concern in my opinion. Since window computation is per-key, there is 
no way around this. I don't think skipping the cleanup timer for non 
global windows without state is a good idea, just to save one 
cleanup timer, when there are already timers created for the window 
computation.


Now, the global window is different in that respect because we can't 
assume it is going to be triggered for unbounded streams. Thus, it 
makes sense to me to handle it differently by not using triggers but 
cleaning up once a watermark > MAX_TIMESTAMP has been processed.


-Max

On 26.08.20 09:20, Jan Lukavský wrote:

On 8/25/20 9:27 PM, Maximilian Michels wrote:

I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. 


Why is special handling for the global window weird? After all, it 
is a special case because the global window normally will only be 
cleaned up when the application terminates.


The inefficiency described happens if and only if the following two 
conditions are met:


  a) there are many timers per single window (as otherwise they 
will be negligible)


  b) there are many keys which actually contain no state (as 
otherwise the timer would be negligible wrt the state size)


It only happens to be the case that global window is the (by far, 
might be 98% cases) most common case that satisfies these two 
conditions, but there are other cases as well (e.g. long lasting 
fixed window). Discussed options 2) and 3) are systematic in the 
sense that option 2) cancels property a) and option 3) property b). 
Making use of correlation of global window with these two 
conditions to solve the issue is of course possible, but a little 
unsystematic and that's what feels 'weird'. :)




It doesn't change anything wrt migration. The timers that were 
already set remain and keep on contributing to the state size.


That's ok, regular timers for non-global windows need to remain 
set and should be persisted. They will be redistributed when 
scaling up and down.


I'm not sure that's a "problem", rather an inefficiency. But we 
could address it by deleting the timers where they are currently 
set, as mentioned previously.


I had imagined that we don't even set these timers for the global 
window. Thus, there is no need to clean them up.


-Max

On 25.08.20 09:43, Jan Lukavský wrote:
I 

Re: [External] Re: Memory Issue When Running Beam On Flink

2020-08-27 Thread Maximilian Michels
If the user chooses to create a window of 10 years, I'd say it is 
expected behavior that the state will be kept for as long as this duration.


GlobalWindows are different because they represent the default case 
where the user does not even use windowing. I think it warrants to be 
treated differently, especially because cleanup simply cannot be ensured 
by the watermark.


It would be possible to combine both approaches, but I'd rather not skip 
the cleanup timer for non-global windows because that could easily 
become the source of another leak. The more pressing issue here is the 
global window, not specific windowing.


-Max

On 26.08.20 10:15, Jan Lukavský wrote:
Window triggering is afaik operation that is specific to GBK. Stateful 
DoFns can have (as shown in the case of deduplication) timers set for 
the GC only, triggering has no effect there. And yes, if we have other 
timers than GC (any user timers), then we have to have GC timer (because 
timers are a form of state).


Imagine a (admittedly artificial) example of deduplication in fixed 
window of 10 years. It would exhibit exactly the same state growth as 
global window (and 10 years is "almost infinite", right? :)).


Jan

On 8/26/20 10:01 AM, Maximilian Michels wrote:
The inefficiency described happens if and only if the following two 
conditions are met:


 a) there are many timers per single window (as otherwise they will 
be negligible)


 b) there are many keys which actually contain no state (as otherwise 
the timer would be negligible wrt the state size) 


Each window has to have a timer set, it is unavoidable for the window 
computation to be triggered accordingly. This happens regardless of 
whether we have state associated with the key/window or not. The 
additional cleanup timer is just a side effect and not a concern in my 
opinion. Since window computation is per-key, there is no way around 
this. I don't think skipping the cleanup timer for non global windows 
without state is a good idea, just to save one cleanup timer, when 
there are already timers created for the window computation.


Now, the global window is different in that respect because we can't 
assume it is going to be triggered for unbounded streams. Thus, it 
makes sense to me to handle it differently by not using triggers but 
cleaning up once a watermark > MAX_TIMESTAMP has been processed.


-Max

On 26.08.20 09:20, Jan Lukavský wrote:

On 8/25/20 9:27 PM, Maximilian Michels wrote:

I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. 


Why is special handling for the global window weird? After all, it 
is a special case because the global window normally will only be 
cleaned up when the application terminates.


The inefficiency described happens if and only if the following two 
conditions are met:


  a) there are many timers per single window (as otherwise they will 
be negligible)


  b) there are many keys which actually contain no state (as 
otherwise the timer would be negligible wrt the state size)


It only happens to be the case that global window is the (by far, 
might be 98% cases) most common case that satisfies these two 
conditions, but there are other cases as well (e.g. long lasting 
fixed window). Discussed options 2) and 3) are systematic in the 
sense that option 2) cancels property a) and option 3) property b). 
Making use of correlation of global window with these two conditions 
to solve the issue is of course possible, but a little unsystematic 
and that's what feels 'weird'. :)




It doesn't change anything wrt migration. The timers that were 
already set remain and keep on contributing to the state size.


That's ok, regular timers for non-global windows need to remain set 
and should be persisted. They will be redistributed when scaling up 
and down.


I'm not sure that's a "problem", rather an inefficiency. But we 
could address it by deleting the timers where they are currently 
set, as mentioned previously.


I had imagined that we don't even set these timers for the global 
window. Thus, there is no need to clean them up.


-Max

On 25.08.20 09:43, Jan Lukavský wrote:
I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. A solution that handles all windows equally would be 
semantically 'cleaner'. If I try to sum up:


  - option 3) seems best, provided that isEmpty() lookup is cheap 
for every state backend (e.g. that we do not hit disk multiple 
times), this option is the best for state size wrt timers in all 
windows


  - option 2) works well for key-aligned windows, also reduces 
state size in all windows


  - option "watermark timer" - solves issue, easily implemented, 
but doesn't improve 

Re: [External] Re: Memory Issue When Running Beam On Flink

2020-08-26 Thread Jan Lukavský
Window triggering is afaik operation that is specific to GBK. Stateful 
DoFns can have (as shown in the case of deduplication) timers set for 
the GC only, triggering has no effect there. And yes, if we have other 
timers than GC (any user timers), then we have to have GC timer (because 
timers are a form of state).


Imagine a (admittedly artificial) example of deduplication in fixed 
window of 10 years. It would exhibit exactly the same state growth as 
global window (and 10 years is "almost infinite", right? :)).


Jan

On 8/26/20 10:01 AM, Maximilian Michels wrote:
The inefficiency described happens if and only if the following two 
conditions are met:


 a) there are many timers per single window (as otherwise they will 
be negligible)


 b) there are many keys which actually contain no state (as otherwise 
the timer would be negligible wrt the state size) 


Each window has to have a timer set, it is unavoidable for the window 
computation to be triggered accordingly. This happens regardless of 
whether we have state associated with the key/window or not. The 
additional cleanup timer is just a side effect and not a concern in my 
opinion. Since window computation is per-key, there is no way around 
this. I don't think skipping the cleanup timer for non global windows 
without state is a good idea, just to save one cleanup timer, when 
there are already timers created for the window computation.


Now, the global window is different in that respect because we can't 
assume it is going to be triggered for unbounded streams. Thus, it 
makes sense to me to handle it differently by not using triggers but 
cleaning up once a watermark > MAX_TIMESTAMP has been processed.


-Max

On 26.08.20 09:20, Jan Lukavský wrote:

On 8/25/20 9:27 PM, Maximilian Michels wrote:

I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. 


Why is special handling for the global window weird? After all, it 
is a special case because the global window normally will only be 
cleaned up when the application terminates.


The inefficiency described happens if and only if the following two 
conditions are met:


  a) there are many timers per single window (as otherwise they will 
be negligible)


  b) there are many keys which actually contain no state (as 
otherwise the timer would be negligible wrt the state size)


It only happens to be the case that global window is the (by far, 
might be 98% cases) most common case that satisfies these two 
conditions, but there are other cases as well (e.g. long lasting 
fixed window). Discussed options 2) and 3) are systematic in the 
sense that option 2) cancels property a) and option 3) property b). 
Making use of correlation of global window with these two conditions 
to solve the issue is of course possible, but a little unsystematic 
and that's what feels 'weird'. :)




It doesn't change anything wrt migration. The timers that were 
already set remain and keep on contributing to the state size.


That's ok, regular timers for non-global windows need to remain set 
and should be persisted. They will be redistributed when scaling up 
and down.


I'm not sure that's a "problem", rather an inefficiency. But we 
could address it by deleting the timers where they are currently 
set, as mentioned previously.


I had imagined that we don't even set these timers for the global 
window. Thus, there is no need to clean them up.


-Max

On 25.08.20 09:43, Jan Lukavský wrote:
I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. A solution that handles all windows equally would be 
semantically 'cleaner'. If I try to sum up:


  - option 3) seems best, provided that isEmpty() lookup is cheap 
for every state backend (e.g. that we do not hit disk multiple 
times), this option is the best for state size wrt timers in all 
windows


  - option 2) works well for key-aligned windows, also reduces 
state size in all windows


  - option "watermark timer" - solves issue, easily implemented, 
but doesn't improve situation for non-global windows


My conclusion would be - use watermark timer as hotfix, if we can 
prove that isEmpty() would be cheap, then use option 3) as final 
solution, otherwise use 2).


WDYT?

On 8/25/20 5:48 AM, Thomas Weise wrote:



On Mon, Aug 24, 2020 at 1:50 PM Maximilian Michels > wrote:


    I'd suggest a modified option (2) which does not use a timer to
    perform
    the cleanup (as mentioned, this will cause problems with 
migrating

    state).


That's a great idea. It's essentially a mix of 1) and 2) for the 
global window only.


It doesn't change anything wrt migration. The timers that 
were already set remain and keep on 

Re: [External] Re: Memory Issue When Running Beam On Flink

2020-08-26 Thread Maximilian Michels

The inefficiency described happens if and only if the following two conditions 
are met:

 a) there are many timers per single window (as otherwise they will be 
negligible)

 b) there are many keys which actually contain no state (as otherwise the timer would be negligible wrt the state size) 


Each window has to have a timer set, it is unavoidable for the window 
computation to be triggered accordingly. This happens regardless of 
whether we have state associated with the key/window or not. The 
additional cleanup timer is just a side effect and not a concern in my 
opinion. Since window computation is per-key, there is no way around 
this. I don't think skipping the cleanup timer for non global windows 
without state is a good idea, just to save one cleanup timer, when there 
are already timers created for the window computation.


Now, the global window is different in that respect because we can't 
assume it is going to be triggered for unbounded streams. Thus, it makes 
sense to me to handle it differently by not using triggers but cleaning 
up once a watermark > MAX_TIMESTAMP has been processed.


-Max

On 26.08.20 09:20, Jan Lukavský wrote:

On 8/25/20 9:27 PM, Maximilian Michels wrote:

I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. 


Why is special handling for the global window weird? After all, it is 
a special case because the global window normally will only be cleaned 
up when the application terminates.


The inefficiency described happens if and only if the following two 
conditions are met:


  a) there are many timers per single window (as otherwise they will be 
negligible)


  b) there are many keys which actually contain no state (as otherwise 
the timer would be negligible wrt the state size)


It only happens to be the case that global window is the (by far, might 
be 98% cases) most common case that satisfies these two conditions, but 
there are other cases as well (e.g. long lasting fixed window). 
Discussed options 2) and 3) are systematic in the sense that option 2) 
cancels property a) and option 3) property b). Making use of correlation 
of global window with these two conditions to solve the issue is of 
course possible, but a little unsystematic and that's what feels 
'weird'. :)




It doesn't change anything wrt migration. The timers that were 
already set remain and keep on contributing to the state size.


That's ok, regular timers for non-global windows need to remain set 
and should be persisted. They will be redistributed when scaling up 
and down.


I'm not sure that's a "problem", rather an inefficiency. But we could 
address it by deleting the timers where they are currently set, as 
mentioned previously.


I had imagined that we don't even set these timers for the global 
window. Thus, there is no need to clean them up.


-Max

On 25.08.20 09:43, Jan Lukavský wrote:
I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. A solution that handles all windows equally would be 
semantically 'cleaner'. If I try to sum up:


  - option 3) seems best, provided that isEmpty() lookup is cheap for 
every state backend (e.g. that we do not hit disk multiple times), 
this option is the best for state size wrt timers in all windows


  - option 2) works well for key-aligned windows, also reduces state 
size in all windows


  - option "watermark timer" - solves issue, easily implemented, but 
doesn't improve situation for non-global windows


My conclusion would be - use watermark timer as hotfix, if we can 
prove that isEmpty() would be cheap, then use option 3) as final 
solution, otherwise use 2).


WDYT?

On 8/25/20 5:48 AM, Thomas Weise wrote:



On Mon, Aug 24, 2020 at 1:50 PM Maximilian Michels > wrote:


    I'd suggest a modified option (2) which does not use a timer to
    perform
    the cleanup (as mentioned, this will cause problems with migrating
    state).


That's a great idea. It's essentially a mix of 1) and 2) for the 
global window only.


It doesn't change anything wrt migration. The timers that 
were already set remain and keep on contributing to the state size.


I'm not sure that's a "problem", rather an inefficiency. But we 
could address it by deleting the timers where they are currently 
set, as mentioned previously.



    Instead, whenever we receive a watermark which closes the global
    window,
    we enumerate all keys and cleanup the associated state.

    This is the cleanest and simplest option.

    -Max

    On 24.08.20 20:47, Thomas Weise wrote:
    >
    > On Mon, Aug 24, 2020 at 11:35 AM Jan Lukavský mailto:je...@seznam.cz>
    > >> 

Re: [External] Re: Memory Issue When Running Beam On Flink

2020-08-26 Thread Jan Lukavský

On 8/25/20 9:27 PM, Maximilian Michels wrote:

I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. 


Why is special handling for the global window weird? After all, it is 
a special case because the global window normally will only be cleaned 
up when the application terminates.


The inefficiency described happens if and only if the following two 
conditions are met:


 a) there are many timers per single window (as otherwise they will be 
negligible)


 b) there are many keys which actually contain no state (as otherwise 
the timer would be negligible wrt the state size)


It only happens to be the case that global window is the (by far, might 
be 98% cases) most common case that satisfies these two conditions, but 
there are other cases as well (e.g. long lasting fixed window). 
Discussed options 2) and 3) are systematic in the sense that option 2) 
cancels property a) and option 3) property b). Making use of correlation 
of global window with these two conditions to solve the issue is of 
course possible, but a little unsystematic and that's what feels 'weird'. :)




It doesn't change anything wrt migration. The timers that were 
already set remain and keep on contributing to the state size.


That's ok, regular timers for non-global windows need to remain set 
and should be persisted. They will be redistributed when scaling up 
and down.


I'm not sure that's a "problem", rather an inefficiency. But we could 
address it by deleting the timers where they are currently set, as 
mentioned previously.


I had imagined that we don't even set these timers for the global 
window. Thus, there is no need to clean them up.


-Max

On 25.08.20 09:43, Jan Lukavský wrote:
I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels 
weird, as there is really nothing special about global window wrt 
state cleanup. A solution that handles all windows equally would be 
semantically 'cleaner'. If I try to sum up:


  - option 3) seems best, provided that isEmpty() lookup is cheap for 
every state backend (e.g. that we do not hit disk multiple times), 
this option is the best for state size wrt timers in all windows


  - option 2) works well for key-aligned windows, also reduces state 
size in all windows


  - option "watermark timer" - solves issue, easily implemented, but 
doesn't improve situation for non-global windows


My conclusion would be - use watermark timer as hotfix, if we can 
prove that isEmpty() would be cheap, then use option 3) as final 
solution, otherwise use 2).


WDYT?

On 8/25/20 5:48 AM, Thomas Weise wrote:



On Mon, Aug 24, 2020 at 1:50 PM Maximilian Michels > wrote:


    I'd suggest a modified option (2) which does not use a timer to
    perform
    the cleanup (as mentioned, this will cause problems with migrating
    state).


That's a great idea. It's essentially a mix of 1) and 2) for the 
global window only.


It doesn't change anything wrt migration. The timers that 
were already set remain and keep on contributing to the state size.


I'm not sure that's a "problem", rather an inefficiency. But we 
could address it by deleting the timers where they are currently 
set, as mentioned previously.



    Instead, whenever we receive a watermark which closes the global
    window,
    we enumerate all keys and cleanup the associated state.

    This is the cleanest and simplest option.

    -Max

    On 24.08.20 20:47, Thomas Weise wrote:
    >
    > On Mon, Aug 24, 2020 at 11:35 AM Jan Lukavský mailto:je...@seznam.cz>
    > >> wrote:
    >
    >      > The most general solution would be 3), given it can be
    agnostic
    >     to window types and does not assume extra runner 
capabilities.

    >
    >     Agree, 2) is optimization to that. It might be questionable
    if this
    >     is premature optimization, but generally querying multiple
    states
    >     for each clear opeartion to any state might be prohibitive,
    mostly
    >     when the state would be stored in external database (in 
case of

    >     Flink that would be RocksDB).
    >
    > For the use case I'm looking at, we are using the heap state
    backend. I
    > have not checked the RocksDB, but would assume that incremental
    cost of
    > isEmpty() for other states under the same key is negligible?
    >
    >      > 3) wouldn't require any state migration.
    >
    >     Actually, it would, as we would (ideally) like to migrate 
users'

    >     pipelines that already contain timers for the end of global
    window,
    >     which might not expire ever.
    >
    > Good catch. This could potentially be addressed by upgrading the
    timer
    > in the per record path.
    >
    >     On 

Re: [External] Re: Memory Issue When Running Beam On Flink

2020-08-25 Thread Maximilian Michels
I agree that this probably solves the described issue in the most straightforward way, but special handling for global window feels weird, as there is really nothing special about global window wrt state cleanup. 


Why is special handling for the global window weird? After all, it is a 
special case because the global window normally will only be cleaned up 
when the application terminates.



It doesn't change anything wrt migration. The timers that were already set 
remain and keep on contributing to the state size.


That's ok, regular timers for non-global windows need to remain set and 
should be persisted. They will be redistributed when scaling up and down.



I'm not sure that's a "problem", rather an inefficiency. But we could address 
it by deleting the timers where they are currently set, as mentioned previously.


I had imagined that we don't even set these timers for the global 
window. Thus, there is no need to clean them up.


-Max

On 25.08.20 09:43, Jan Lukavský wrote:
I agree that this probably solves the described issue in the most 
straightforward way, but special handling for global window feels weird, 
as there is really nothing special about global window wrt state 
cleanup. A solution that handles all windows equally would be 
semantically 'cleaner'. If I try to sum up:


  - option 3) seems best, provided that isEmpty() lookup is cheap for 
every state backend (e.g. that we do not hit disk multiple times), this 
option is the best for state size wrt timers in all windows


  - option 2) works well for key-aligned windows, also reduces state 
size in all windows


  - option "watermark timer" - solves issue, easily implemented, but 
doesn't improve situation for non-global windows


My conclusion would be - use watermark timer as hotfix, if we can prove 
that isEmpty() would be cheap, then use option 3) as final solution, 
otherwise use 2).


WDYT?

On 8/25/20 5:48 AM, Thomas Weise wrote:



On Mon, Aug 24, 2020 at 1:50 PM Maximilian Michels > wrote:


I'd suggest a modified option (2) which does not use a timer to
perform
the cleanup (as mentioned, this will cause problems with migrating
state).


That's a great idea. It's essentially a mix of 1) and 2) for the 
global window only.


It doesn't change anything wrt migration. The timers that 
were already set remain and keep on contributing to the state size.


I'm not sure that's a "problem", rather an inefficiency. But we could 
address it by deleting the timers where they are currently set, as 
mentioned previously.



Instead, whenever we receive a watermark which closes the global
window,
we enumerate all keys and cleanup the associated state.

This is the cleanest and simplest option.

-Max

On 24.08.20 20:47, Thomas Weise wrote:
>
> On Mon, Aug 24, 2020 at 11:35 AM Jan Lukavský mailto:je...@seznam.cz>
> >> wrote:
>
>      > The most general solution would be 3), given it can be
agnostic
>     to window types and does not assume extra runner capabilities.
>
>     Agree, 2) is optimization to that. It might be questionable
if this
>     is premature optimization, but generally querying multiple
states
>     for each clear opeartion to any state might be prohibitive,
mostly
>     when the state would be stored in external database (in case of
>     Flink that would be RocksDB).
>
> For the use case I'm looking at, we are using the heap state
backend. I
> have not checked the RocksDB, but would assume that incremental
cost of
> isEmpty() for other states under the same key is negligible?
>
>      > 3) wouldn't require any state migration.
>
>     Actually, it would, as we would (ideally) like to migrate users'
>     pipelines that already contain timers for the end of global
window,
>     which might not expire ever.
>
> Good catch. This could potentially be addressed by upgrading the
timer
> in the per record path.
>
>     On 8/24/20 7:44 PM, Thomas Weise wrote:
>>
>>     On Fri, Aug 21, 2020 at 12:32 AM Jan Lukavský
mailto:je...@seznam.cz>
>>     >> wrote:
>>
>>         If there are runners, that are unable to efficiently
enumerate
>>         keys in state, then there probably isn't a runner agnostic
>>         solution to this. If we focus on Flink, we can provide
>>         specific implementation of CleanupTimer, which might
then do
>>         anything from the mentioned options. I'd be +1 for
option 2)
>>         for key-aligned windows (all currently supported) and
option
>>         3) for unaligned windows in the future.
>>
>>     The most general solution would be 3), given it can be
agnostic to
>>     window types and does not assume extra 

Re: [External] Re: Memory Issue When Running Beam On Flink

2020-08-24 Thread Maximilian Michels
I'd suggest a modified option (2) which does not use a timer to perform 
the cleanup (as mentioned, this will cause problems with migrating state).


Instead, whenever we receive a watermark which closes the global window, 
we enumerate all keys and cleanup the associated state.


This is the cleanest and simplest option.

-Max

On 24.08.20 20:47, Thomas Weise wrote:


On Mon, Aug 24, 2020 at 11:35 AM Jan Lukavský > wrote:


 > The most general solution would be 3), given it can be agnostic
to window types and does not assume extra runner capabilities.

Agree, 2) is optimization to that. It might be questionable if this
is premature optimization, but generally querying multiple states
for each clear opeartion to any state might be prohibitive, mostly
when the state would be stored in external database (in case of
Flink that would be RocksDB).

For the use case I'm looking at, we are using the heap state backend. I 
have not checked the RocksDB, but would assume that incremental cost of 
isEmpty() for other states under the same key is negligible?


 > 3) wouldn't require any state migration.

Actually, it would, as we would (ideally) like to migrate users'
pipelines that already contain timers for the end of global window,
which might not expire ever.

Good catch. This could potentially be addressed by upgrading the timer 
in the per record path.


On 8/24/20 7:44 PM, Thomas Weise wrote:


On Fri, Aug 21, 2020 at 12:32 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

If there are runners, that are unable to efficiently enumerate
keys in state, then there probably isn't a runner agnostic
solution to this. If we focus on Flink, we can provide
specific implementation of CleanupTimer, which might then do
anything from the mentioned options. I'd be +1 for option 2)
for key-aligned windows (all currently supported) and option
3) for unaligned windows in the future.

The most general solution would be 3), given it can be agnostic to
window types and does not assume extra runner capabilities. It
would require to introspect all user states for a given key on
state.clear. That assumes as efficient implementation of
isEmpty(). If all states are empty (have been cleared), then we
can remove the cleanup timer. And add it back on state.add. I'm
planning to give that a shot (for Flink/portable/streaming) to see
how it performs.

We should also consider how we migrate users from the current
state to any future implementation. In case of option 2) it
should be possible to do this when the state is loaded from
savepoint, but I'm not 100% sure about that.

3) wouldn't require any state migration.

Jan

On 8/21/20 6:25 AM, Thomas Weise wrote:

Thanks for the clarification.

Here are a few potential options to address the issue, based
on the discussion so far:

1) Optionally skip cleanup timer for global window
(user-controlled via pipeline option)

2) Instead of setting a cleanup timer for every key, handle
all keys for a given window with a single timer. This would
be runner specific and depend on if/how a given
runner supports key enumeration. Flink's keyed state backend
supports enumerating keys for a namespace (Beam window) and
state tag. [1]

3) Set the cleanup timer only when there is actually state
associated with a key. This could be accomplished by
intercepting append and clear in BagUserStateHandler [2] and
adding/removing the timer appropriately.

4) See if TTL support in the runner can is applicable, for
Flink see [3]

[1]

https://github.com/apache/flink/blob/release-1.10/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java#L76

[2]

https://github.com/apache/beam/blob/release-2.23.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java#L315

[3]

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl


On Thu, Aug 20, 2020 at 8:08 AM Reuven Lax mailto:re...@google.com>> wrote:

Also +1 to what Jan said. Streaming pipelines can process
bounded PCollections on some paths, so the global window
will terminate for those paths. This is also true for the
direct runner  tetsts where PCollections pretend to be
unbounded, but we then advance the watermark to +inf to
terminate the pipeline.

On Thu, Aug 20, 2020 at 8:06 AM Reuven Lax
mailto:re...@google.com>> wrote:

It is not Dataflow specific, but I think Dataflow is
the only runner that currently 

Re: [External] Re: Memory Issue When Running Beam On Flink

2020-08-15 Thread Maximilian Michels

Awesome! Thanks a lot for the memory profile. Couple remarks:

a) I can see that there are about 378k keys and each of them sets a timer.
b) Based on the settings for DeduplicatePerKey you posted, you will keep 
track of all keys of the last 30 minutes.


Unless you have much fewer keys, the behavior is to be expected. The 
memory sizes for the timer maps do not look particularly high (~12Mb).


How much memory did you reserve for the task managers?*

-Max

*The image links give me a "504 error".

On 14.08.20 23:29, Catlyn Kong wrote:

Hi!

We're indeed using the rocksdb state backend, so that might be part of 
the reason. Due to some security concerns, we might not be able to 
provide the full heap dump since we have some custom code path. But 
here's a screenshot from JProfiler:

Screen Shot 2020-08-14 at 9.10.07 AM.png
Looks like TimerHeapInternalTimer (initiated in InternalTimerServiceImpl 
) 
isn't getting garbage collected? As David has mentioned the pipeline 
uses DeduplicatePerKey 
 in 
Beam 2.22, ProcessConnectionEventFn is a simple stateless DoFn that just 
does some logging and emits the events. Is there any possibility that 
the timer logic or the way it's used in the dedupe Pardo can cause this 
leak?


Thanks,
Catlyn

On Tue, Aug 11, 2020 at 7:58 AM Maximilian Michels > wrote:


Hi!

Looks like a potential leak, caused by your code or by Beam itself.
Would you be able to supply a heap dump from one of the task managers?
That would greatly help debugging this issue.

-Max

On 07.08.20 00:19, David Gogokhiya wrote:
 > Hi,
 >
 > We recently started using Apache Beam version 2.20.0 running on
Flink
 > version 1.9 deployed on kubernetes to process unbounded streams
of data.
 > However, we noticed that the memory consumed by stateful Beam is
 > steadily increasing over time with no drops no matter what the
current
 > bandwidth is. We were wondering if this is expected and if not what
 > would be the best way to resolve it.
 >
 >
 >       More Context
 >
 > We have the following pipeline that consumes messages from the
unbounded
 > stream of data. Later we deduplicate the messages based on unique
 > message id using the deduplicate function
 >

.

 > Since we are using Beam version 2.20.0, we copied the source code
of the
 > deduplicate function
 >

from

 > version 2.22.0. After that we unmap the tuple, retrieve the
necessary
 > data from message payload and dump the corresponding data into
the log.
 >
 >
 > Pipeline:
 >
 >
 > Flink configuration:
 >
 >
 > As we mentioned before, we noticed that the memory usage of the
 > jobmanager and taskmanager pod are steadily increasing with no
drops no
 > matter what the current bandwidth is. We tried allocating more
memory
 > but it seems like no matter how much memory we allocate it
eventually
 > reaches its limit and then it tries to restart itself.
 >
 >
 > Sincerely, David
 >
 >