I have a very naive question. I know Jan suggested to use 2 successive fixed 
overlapping windows with offset as a temporary solution to dedup the events. 
However, I am wondering whether using a single fixed window of length let's say 
1 day followed by a deduplicate function is a good alternative? I assume that 
at the end of the window all the timers will be cleared which will result in 
missing some of the duplicates but I am ok with that.

My pipeline looks something like the following: 
https://pasteboard.co/JoWL0HP.png

It seems to be working when I tested it but I wanted to double check especially 
considering the the following statement taken from the Beam documentation 
(https://beam.apache.org/documentation/programming-guide/#windowing): "If you 
set a windowing function using the Window transform, each element is assigned 
to a window, but the windows are not considered until GroupByKey or Combine 
aggregates across a window and key. "

P.S. this is my 5th attempt to post a reply. I hope this reply will be 
posted...not sure why my prev emails didn't make it through though


On 2020/08/27 10:28:48, Jan Lukavský <je...@seznam.cz> wrote: 
>  > If the user chooses to create a window of 10 years, I'd say it is 
> expected behavior that the state will be kept for as long as this duration.
> 
> State will be kept, the problem is that each key in the window will 
> carry a cleanup timer, although there might be nothing to clear (there 
> is no state to be kept). This suboptimality is really related only to 
> these cases and there is nothing special about global windows in there. 
> It is only about that other large windowfns are really rare, but that is 
> a coincidence, not a cause.
> 
> Nevertheless, I'm fine with your proposed solution, we might extend it 
> in the future, if we find it useful. :)
> 
> Jan
> 
> On 8/27/20 12:06 PM, Maximilian Michels wrote:
> > If the user chooses to create a window of 10 years, I'd say it is 
> > expected behavior that the state will be kept for as long as this 
> > duration.
> >
> > GlobalWindows are different because they represent the default case 
> > where the user does not even use windowing. I think it warrants to be 
> > treated differently, especially because cleanup simply cannot be 
> > ensured by the watermark.
> >
> > It would be possible to combine both approaches, but I'd rather not 
> > skip the cleanup timer for non-global windows because that could 
> > easily become the source of another leak. The more pressing issue here 
> > is the global window, not specific windowing.
> >
> > -Max
> >
> > On 26.08.20 10:15, Jan Lukavský wrote:
> >> Window triggering is afaik operation that is specific to GBK. 
> >> Stateful DoFns can have (as shown in the case of deduplication) 
> >> timers set for the GC only, triggering has no effect there. And yes, 
> >> if we have other timers than GC (any user timers), then we have to 
> >> have GC timer (because timers are a form of state).
> >>
> >> Imagine a (admittedly artificial) example of deduplication in fixed 
> >> window of 10 years. It would exhibit exactly the same state growth as 
> >> global window (and 10 years is "almost infinite", right? :)).
> >>
> >> Jan
> >>
> >> On 8/26/20 10:01 AM, Maximilian Michels wrote:
> >>>> The inefficiency described happens if and only if the following two 
> >>>> conditions are met:
> >>>>
> >>>>  a) there are many timers per single window (as otherwise they will 
> >>>> be negligible)
> >>>>
> >>>>  b) there are many keys which actually contain no state (as 
> >>>> otherwise the timer would be negligible wrt the state size) 
> >>>
> >>> Each window has to have a timer set, it is unavoidable for the 
> >>> window computation to be triggered accordingly. This happens 
> >>> regardless of whether we have state associated with the key/window 
> >>> or not. The additional cleanup timer is just a side effect and not a 
> >>> concern in my opinion. Since window computation is per-key, there is 
> >>> no way around this. I don't think skipping the cleanup timer for non 
> >>> global windows without state is a good idea, just to save one 
> >>> cleanup timer, when there are already timers created for the window 
> >>> computation.
> >>>
> >>> Now, the global window is different in that respect because we can't 
> >>> assume it is going to be triggered for unbounded streams. Thus, it 
> >>> makes sense to me to handle it differently by not using triggers but 
> >>> cleaning up once a watermark > MAX_TIMESTAMP has been processed.
> >>>
> >>> -Max
> >>>
> >>> On 26.08.20 09:20, Jan Lukavský wrote:
> >>>> On 8/25/20 9:27 PM, Maximilian Michels wrote:
> >>>>
> >>>>>> I agree that this probably solves the described issue in the most 
> >>>>>> straightforward way, but special handling for global window feels 
> >>>>>> weird, as there is really nothing special about global window wrt 
> >>>>>> state cleanup. 
> >>>>>
> >>>>> Why is special handling for the global window weird? After all, it 
> >>>>> is a special case because the global window normally will only be 
> >>>>> cleaned up when the application terminates.
> >>>>
> >>>> The inefficiency described happens if and only if the following two 
> >>>> conditions are met:
> >>>>
> >>>>   a) there are many timers per single window (as otherwise they 
> >>>> will be negligible)
> >>>>
> >>>>   b) there are many keys which actually contain no state (as 
> >>>> otherwise the timer would be negligible wrt the state size)
> >>>>
> >>>> It only happens to be the case that global window is the (by far, 
> >>>> might be 98% cases) most common case that satisfies these two 
> >>>> conditions, but there are other cases as well (e.g. long lasting 
> >>>> fixed window). Discussed options 2) and 3) are systematic in the 
> >>>> sense that option 2) cancels property a) and option 3) property b). 
> >>>> Making use of correlation of global window with these two 
> >>>> conditions to solve the issue is of course possible, but a little 
> >>>> unsystematic and that's what feels 'weird'. :)
> >>>>
> >>>>>
> >>>>>> It doesn't change anything wrt migration. The timers that were 
> >>>>>> already set remain and keep on contributing to the state size.
> >>>>>
> >>>>> That's ok, regular timers for non-global windows need to remain 
> >>>>> set and should be persisted. They will be redistributed when 
> >>>>> scaling up and down.
> >>>>>
> >>>>>> I'm not sure that's a "problem", rather an inefficiency. But we 
> >>>>>> could address it by deleting the timers where they are currently 
> >>>>>> set, as mentioned previously.
> >>>>>
> >>>>> I had imagined that we don't even set these timers for the global 
> >>>>> window. Thus, there is no need to clean them up.
> >>>>>
> >>>>> -Max
> >>>>>
> >>>>> On 25.08.20 09:43, Jan Lukavský wrote:
> >>>>>> I agree that this probably solves the described issue in the most 
> >>>>>> straightforward way, but special handling for global window feels 
> >>>>>> weird, as there is really nothing special about global window wrt 
> >>>>>> state cleanup. A solution that handles all windows equally would 
> >>>>>> be semantically 'cleaner'. If I try to sum up:
> >>>>>>
> >>>>>>   - option 3) seems best, provided that isEmpty() lookup is cheap 
> >>>>>> for every state backend (e.g. that we do not hit disk multiple 
> >>>>>> times), this option is the best for state size wrt timers in all 
> >>>>>> windows
> >>>>>>
> >>>>>>   - option 2) works well for key-aligned windows, also reduces 
> >>>>>> state size in all windows
> >>>>>>
> >>>>>>   - option "watermark timer" - solves issue, easily implemented, 
> >>>>>> but doesn't improve situation for non-global windows
> >>>>>>
> >>>>>> My conclusion would be - use watermark timer as hotfix, if we can 
> >>>>>> prove that isEmpty() would be cheap, then use option 3) as final 
> >>>>>> solution, otherwise use 2).
> >>>>>>
> >>>>>> WDYT?
> >>>>>>
> >>>>>> On 8/25/20 5:48 AM, Thomas Weise wrote:
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Aug 24, 2020 at 1:50 PM Maximilian Michels 
> >>>>>>> <m...@apache.org <mailto:m...@apache.org>> wrote:
> >>>>>>>
> >>>>>>>     I'd suggest a modified option (2) which does not use a timer to
> >>>>>>>     perform
> >>>>>>>     the cleanup (as mentioned, this will cause problems with 
> >>>>>>> migrating
> >>>>>>>     state).
> >>>>>>>
> >>>>>>>
> >>>>>>> That's a great idea. It's essentially a mix of 1) and 2) for the 
> >>>>>>> global window only.
> >>>>>>>
> >>>>>>> It doesn't change anything wrt migration. The timers that 
> >>>>>>> were already set remain and keep on contributing to the state size.
> >>>>>>>
> >>>>>>> I'm not sure that's a "problem", rather an inefficiency. But we 
> >>>>>>> could address it by deleting the timers where they are currently 
> >>>>>>> set, as mentioned previously.
> >>>>>>>
> >>>>>>>
> >>>>>>>     Instead, whenever we receive a watermark which closes the 
> >>>>>>> global
> >>>>>>>     window,
> >>>>>>>     we enumerate all keys and cleanup the associated state.
> >>>>>>>
> >>>>>>>     This is the cleanest and simplest option.
> >>>>>>>
> >>>>>>>     -Max
> >>>>>>>
> >>>>>>>     On 24.08.20 20:47, Thomas Weise wrote:
> >>>>>>>     >
> >>>>>>>     > On Mon, Aug 24, 2020 at 11:35 AM Jan Lukavský 
> >>>>>>> <je...@seznam.cz
> >>>>>>>     <mailto:je...@seznam.cz>
> >>>>>>>     > <mailto:je...@seznam.cz <mailto:je...@seznam.cz>>> wrote:
> >>>>>>>     >
> >>>>>>>     >      > The most general solution would be 3), given it can be
> >>>>>>>     agnostic
> >>>>>>>     >     to window types and does not assume extra runner 
> >>>>>>> capabilities.
> >>>>>>>     >
> >>>>>>>     >     Agree, 2) is optimization to that. It might be 
> >>>>>>> questionable
> >>>>>>>     if this
> >>>>>>>     >     is premature optimization, but generally querying 
> >>>>>>> multiple
> >>>>>>>     states
> >>>>>>>     >     for each clear opeartion to any state might be 
> >>>>>>> prohibitive,
> >>>>>>>     mostly
> >>>>>>>     >     when the state would be stored in external database 
> >>>>>>> (in case of
> >>>>>>>     >     Flink that would be RocksDB).
> >>>>>>>     >
> >>>>>>>     > For the use case I'm looking at, we are using the heap state
> >>>>>>>     backend. I
> >>>>>>>     > have not checked the RocksDB, but would assume that 
> >>>>>>> incremental
> >>>>>>>     cost of
> >>>>>>>     > isEmpty() for other states under the same key is negligible?
> >>>>>>>     >
> >>>>>>>     >      > 3) wouldn't require any state migration.
> >>>>>>>     >
> >>>>>>>     >     Actually, it would, as we would (ideally) like to 
> >>>>>>> migrate users'
> >>>>>>>     >     pipelines that already contain timers for the end of 
> >>>>>>> global
> >>>>>>>     window,
> >>>>>>>     >     which might not expire ever.
> >>>>>>>     >
> >>>>>>>     > Good catch. This could potentially be addressed by 
> >>>>>>> upgrading the
> >>>>>>>     timer
> >>>>>>>     > in the per record path.
> >>>>>>>     >
> >>>>>>>     >     On 8/24/20 7:44 PM, Thomas Weise wrote:
> >>>>>>>     >>
> >>>>>>>     >>     On Fri, Aug 21, 2020 at 12:32 AM Jan Lukavský
> >>>>>>>     <je...@seznam.cz <mailto:je...@seznam.cz>
> >>>>>>>     >>     <mailto:je...@seznam.cz <mailto:je...@seznam.cz>>> 
> >>>>>>> wrote:
> >>>>>>>     >>
> >>>>>>>     >>         If there are runners, that are unable to efficiently
> >>>>>>>     enumerate
> >>>>>>>     >>         keys in state, then there probably isn't a runner 
> >>>>>>> agnostic
> >>>>>>>     >>         solution to this. If we focus on Flink, we can 
> >>>>>>> provide
> >>>>>>>     >>         specific implementation of CleanupTimer, which might
> >>>>>>>     then do
> >>>>>>>     >>         anything from the mentioned options. I'd be +1 for
> >>>>>>>     option 2)
> >>>>>>>     >>         for key-aligned windows (all currently supported) 
> >>>>>>> and
> >>>>>>>     option
> >>>>>>>     >>         3) for unaligned windows in the future.
> >>>>>>>     >>
> >>>>>>>     >>     The most general solution would be 3), given it can be
> >>>>>>>     agnostic to
> >>>>>>>     >>     window types and does not assume extra runner 
> >>>>>>> capabilities. It
> >>>>>>>     >>     would require to introspect all user states for a 
> >>>>>>> given key on
> >>>>>>>     >>     state.clear. That assumes as efficient implementation of
> >>>>>>>     >>     isEmpty(). If all states are empty (have been 
> >>>>>>> cleared), then we
> >>>>>>>     >>     can remove the cleanup timer. And add it back on 
> >>>>>>> state.add. I'm
> >>>>>>>     >>     planning to give that a shot (for 
> >>>>>>> Flink/portable/streaming)
> >>>>>>>     to see
> >>>>>>>     >>     how it performs.
> >>>>>>>     >>
> >>>>>>>     >>         We should also consider how we migrate users from 
> >>>>>>> the
> >>>>>>>     current
> >>>>>>>     >>         state to any future implementation. In case of 
> >>>>>>> option 2) it
> >>>>>>>     >>         should be possible to do this when the state is 
> >>>>>>> loaded from
> >>>>>>>     >>         savepoint, but I'm not 100% sure about that.
> >>>>>>>     >>
> >>>>>>>     >>     3) wouldn't require any state migration.
> >>>>>>>     >>
> >>>>>>>     >>         Jan
> >>>>>>>     >>
> >>>>>>>     >>         On 8/21/20 6:25 AM, Thomas Weise wrote:
> >>>>>>>     >>>         Thanks for the clarification.
> >>>>>>>     >>>
> >>>>>>>     >>>         Here are a few potential options to address the 
> >>>>>>> issue,
> >>>>>>>     based
> >>>>>>>     >>>         on the discussion so far:
> >>>>>>>     >>>
> >>>>>>>     >>>         1) Optionally skip cleanup timer for global window
> >>>>>>>     >>>         (user-controlled via pipeline option)
> >>>>>>>     >>>
> >>>>>>>     >>>         2) Instead of setting a cleanup timer for every 
> >>>>>>> key,
> >>>>>>>     handle
> >>>>>>>     >>>         all keys for a given window with a single timer. 
> >>>>>>> This
> >>>>>>>     would
> >>>>>>>     >>>         be runner specific and depend on if/how a given
> >>>>>>>     >>>         runner supports key enumeration. Flink's keyed 
> >>>>>>> state
> >>>>>>>     backend
> >>>>>>>     >>>         supports enumerating keys for a namespace (Beam
> >>>>>>>     window) and
> >>>>>>>     >>>         state tag. [1]
> >>>>>>>     >>>
> >>>>>>>     >>>         3) Set the cleanup timer only when there is 
> >>>>>>> actually state
> >>>>>>>     >>>         associated with a key. This could be 
> >>>>>>> accomplished by
> >>>>>>>     >>>         intercepting append and clear in 
> >>>>>>> BagUserStateHandler
> >>>>>>>     [2] and
> >>>>>>>     >>>         adding/removing the timer appropriately.
> >>>>>>>     >>>
> >>>>>>>     >>>         4) See if TTL support in the runner can is 
> >>>>>>> applicable, for
> >>>>>>>     >>>         Flink see [3]
> >>>>>>>     >>>
> >>>>>>>     >>>         [1]
> >>>>>>>     >>>
> >>>>>>> https://github.com/apache/flink/blob/release-1.10/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java#L76
> >>>>>>>  
> >>>>>>>
> >>>>>>>     >>>
> >>>>>>>     >>>         [2]
> >>>>>>>     >>>
> >>>>>>> https://github.com/apache/beam/blob/release-2.23.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java#L315
> >>>>>>>  
> >>>>>>>
> >>>>>>>     >>>
> >>>>>>>     >>>         [3]
> >>>>>>>     >>>
> >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
> >>>>>>>  
> >>>>>>>
> >>>>>>>     >>>
> >>>>>>>     >>>
> >>>>>>>     >>>         On Thu, Aug 20, 2020 at 8:08 AM Reuven Lax
> >>>>>>>     <re...@google.com <mailto:re...@google.com>
> >>>>>>>     >>>         <mailto:re...@google.com 
> >>>>>>> <mailto:re...@google.com>>>
> >>>>>>>     wrote:
> >>>>>>>     >>>
> >>>>>>>     >>>             Also +1 to what Jan said. Streaming 
> >>>>>>> pipelines can
> >>>>>>>     process
> >>>>>>>     >>>             bounded PCollections on some paths, so the 
> >>>>>>> global
> >>>>>>>     window
> >>>>>>>     >>>             will terminate for those paths. This is also 
> >>>>>>> true
> >>>>>>>     for the
> >>>>>>>     >>>             direct runner  tetsts where PCollections 
> >>>>>>> pretend to be
> >>>>>>>     >>>             unbounded, but we then advance the watermark
> >>>>>>>     to +inf to
> >>>>>>>     >>>             terminate the pipeline.
> >>>>>>>     >>>
> >>>>>>>     >>>             On Thu, Aug 20, 2020 at 8:06 AM Reuven Lax
> >>>>>>>     >>>             <re...@google.com <mailto:re...@google.com>
> >>>>>>>     <mailto:re...@google.com <mailto:re...@google.com>>> wrote:
> >>>>>>>     >>>
> >>>>>>>     >>>                 It is not Dataflow specific, but I think
> >>>>>>>     Dataflow is
> >>>>>>>     >>>                 the only runner that currently implements
> >>>>>>>     >>> 
> >>>>>>>  
> >>>>>>> Drain:https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit
> >>>>>>>  
> >>>>>>>
> >>>>>>>     >>>
> >>>>>>>     >>>
> >>>>>>>     >>>                 When a pipeline is drained, all windows 
> >>>>>>> (including
> >>>>>>>     >>>                 global windows) end, and the windows 
> >>>>>>> processed
> >>>>>>>     (i.e.
> >>>>>>>     >>>                 as if they were fixed windows that 
> >>>>>>> terminated).
> >>>>>>>     >>>                 Currently the easiest way to ensure that 
> >>>>>>> is to
> >>>>>>>     rely
> >>>>>>>     >>>                 on the end-of-window timers for the 
> >>>>>>> global window
> >>>>>>>     >>>                 (alternatives are possible, like issuing a
> >>>>>>>     full-state
> >>>>>>>     >>>                 scan when a pipeline is drained, but 
> >>>>>>> that would be
> >>>>>>>     >>>                 quite a bit more complicated). This is not
> >>>>>>>     >>>                 specifically the GC timer, but rather the
> >>>>>>>     >>>                 end-of-window timer that is needed.
> >>>>>>>     >>>
> >>>>>>>     >>>                 I believe that right now we don't have a 
> >>>>>>> way of
> >>>>>>>     >>>                 deleting timers if there are no elements
> >>>>>>>     buffered for
> >>>>>>>     >>>                 a key (e.g. a key that received a few 
> >>>>>>> elements
> >>>>>>>     that
> >>>>>>>     >>>                 were processed in a trigger and then never
> >>>>>>>     received
> >>>>>>>     >>>                 any more elements). This might be part 
> >>>>>>> of the
> >>>>>>>     problem
> >>>>>>>     >>>                 - large numbers of empty keys with noop 
> >>>>>>> timers
> >>>>>>>     set.
> >>>>>>>     >>>                 It would be nice if there were a way to 
> >>>>>>> detect
> >>>>>>>     this
> >>>>>>>     >>>                 and at least remove the timers for those 
> >>>>>>> empty
> >>>>>>>     keys.
> >>>>>>>     >>>
> >>>>>>>     >>>                 Reuven
> >>>>>>>     >>>
> >>>>>>>     >>>                 On Wed, Aug 19, 2020 at 9:20 PM Thomas 
> >>>>>>> Weise
> >>>>>>>     >>>                 <t...@apache.org <mailto:t...@apache.org>
> >>>>>>>     <mailto:t...@apache.org <mailto:t...@apache.org>>> wrote:
> >>>>>>>     >>>
> >>>>>>>     >>>
> >>>>>>>     >>>
> >>>>>>>     >>>                     On Wed, Aug 19, 2020 at 9:49 AM 
> >>>>>>> Reuven Lax
> >>>>>>>     >>>  <re...@google.com
> >>>>>>>     <mailto:re...@google.com> <mailto:re...@google.com
> >>>>>>>     <mailto:re...@google.com>>> wrote:
> >>>>>>>     >>>
> >>>>>>>     >>>                         Skipping the cleanup timer for 
> >>>>>>> the global
> >>>>>>>     >>>                         window will break any sort of drain
> >>>>>>>     >>>  functionality, which relies on having
> >>>>>>>     those
> >>>>>>>     >>>                         timers there. It's also 
> >>>>>>> necessary for
> >>>>>>>     bounded
> >>>>>>>     >>>                         inputs, for the same reason.
> >>>>>>>     >>>
> >>>>>>>     >>>
> >>>>>>>     >>>                     Can you say a bit more about why 
> >>>>>>> this will
> >>>>>>>     break
> >>>>>>>     >>>  drain functionality and bounded inputs? Is
> >>>>>>>     this
> >>>>>>>     >>>                     Dataflow specific? Is it because the 
> >>>>>>> state
> >>>>>>>     would
> >>>>>>>     >>>                     be reused by a subsequent instance 
> >>>>>>> of the
> >>>>>>>     pipeline?
> >>>>>>>     >>>
> >>>>>>>     >>>                     For Flink, the GC timers would be 
> >>>>>>> triggered by
> >>>>>>>     >>>                     the final watermark and that will be 
> >>>>>>> the
> >>>>>>>     end of
> >>>>>>>     >>>                     the streaming job. Launching the 
> >>>>>>> same pipeline
> >>>>>>>     >>>                     again will either be a cold start 
> >>>>>>> with no
> >>>>>>>     >>>                     previous state or a start from
> >>>>>>>     savepoint/checkpoint.
> >>>>>>>     >>>
> >>>>>>>     >>>                     It sounds like for Dataflow there 
> >>>>>>> may be a
> >>>>>>>     need
> >>>>>>>     >>>                     for the user to influence the behavior
> >>>>>>>     while for
> >>>>>>>     >>>                     Flink the GC timers in a global 
> >>>>>>> window are not
> >>>>>>>     >>>                     required.
> >>>>>>>     >>>
> >>>>>>>     >>>
> >>>>>>>     >>>
> >>>>>>>     >>>
> >>>>>>>     >>>
> >>>>>>>     >>>                     On Wed, Aug 19, 2020 at 10:31 AM 
> >>>>>>> Reuven Lax
> >>>>>>>     >>>  <re...@google.com
> >>>>>>>     <mailto:re...@google.com> <mailto:re...@google.com
> >>>>>>>     <mailto:re...@google.com>>> wrote:
> >>>>>>>     >>>
> >>>>>>>     >>>
> >>>>>>>     >>>
> >>>>>>>     >>>                         On Wed, Aug 19, 2020 at 9:53 AM 
> >>>>>>> Steve
> >>>>>>>     Niemitz
> >>>>>>>     >>>  <sniem...@apache.org
> >>>>>>>     <mailto:sniem...@apache.org>
> >>>>>>>     >>>  <mailto:sniem...@apache.org
> >>>>>>>     <mailto:sniem...@apache.org>>> wrote:
> >>>>>>>     >>>
> >>>>>>>     >>>                             for what it's worth, 
> >>>>>>> dataflow has the
> >>>>>>>     >>>                             same problem here as well. 
> >>>>>>> We've also
> >>>>>>>     >>>                             worked around it by 
> >>>>>>> (optionally)
> >>>>>>>     >>>                             disabling the cleanup timer 
> >>>>>>> in global
> >>>>>>>     >>>                             windows. But I agree, having
> >>>>>>>     drain then
> >>>>>>>     >>>                             be an unsafe operation is 
> >>>>>>> not great.
> >>>>>>>     >>>
> >>>>>>>     >>>
> >>>>>>>     >>>                         Dataflow does not require the 
> >>>>>>> timers
> >>>>>>>     to be in
> >>>>>>>     >>>                         memory though, so unless the 
> >>>>>>> numbers
> >>>>>>>     get very
> >>>>>>>     >>>                         large (to the point where you 
> >>>>>>> run out
> >>>>>>>     of disk
> >>>>>>>     >>>                         storage storing the timers), it 
> >>>>>>> will not
> >>>>>>>     >>>                         cause your pipelines to fail.
> >>>>>>>     >>>
> >>>>>>>     >>>
> >>>>>>>     >>>                             I think for batch it's less 
> >>>>>>> of an
> >>>>>>>     issue
> >>>>>>>     >>>                             since basically everything 
> >>>>>>> is in the
> >>>>>>>     >>>                             global window anyways, and 
> >>>>>>> batch
> >>>>>>>     >>>                             pipelines run for a fixed 
> >>>>>>> amount
> >>>>>>>     of time
> >>>>>>>     >>>                             on a fixed input source.  For
> >>>>>>>     streaming
> >>>>>>>     >>>  pipelines, it's much easier to run
> >>>>>>>     into
> >>>>>>>     >>>                             this.
> >>>>>>>     >>>
> >>>>>>>     >>>
> >>>>>>>     >>>                             On Wed, Aug 19, 2020 at 
> >>>>>>> 12:50 PM
> >>>>>>>     Reuven
> >>>>>>>     >>>                             Lax <re...@google.com
> >>>>>>>     <mailto:re...@google.com>
> >>>>>>>     >>>  <mailto:re...@google.com
> >>>>>>>     <mailto:re...@google.com>>> wrote:
> >>>>>>>     >>>
> >>>>>>>     >>>  @OnWindowExpiration is a per-key
> >>>>>>>     >>>  callback.
> >>>>>>>     >>>
> >>>>>>>     >>>                                 On Wed, Aug 19, 2020 at 
> >>>>>>> 9:48
> >>>>>>>     AM Luke
> >>>>>>>     >>>                                 Cwik <lc...@google.com
> >>>>>>>     <mailto:lc...@google.com>
> >>>>>>>     >>>  <mailto:lc...@google.com
> >>>>>>>     <mailto:lc...@google.com>>> wrote:
> >>>>>>>     >>>
> >>>>>>>     >>>  With the addition
> >>>>>>>     >>>  of @OnWindowExpiration, a single
> >>>>>>>     >>>  timer across keys optimization
> >>>>>>>     >>>  would still make sense.
> >>>>>>>     >>>
> >>>>>>>     >>>  On Wed, Aug 19, 2020 at
> >>>>>>>     8:51 AM
> >>>>>>>     >>>  Thomas Weise
> >>>>>>>     <t...@apache.org <mailto:t...@apache.org>
> >>>>>>>     >>>  <mailto:t...@apache.org
> >>>>>>>     <mailto:t...@apache.org>>> wrote:
> >>>>>>>     >>>
> >>>>>>>     >>> https://issues.apache.org/jira/browse/BEAM-10760
> >>>>>>>     >>>
> >>>>>>>     >>>    I confirmed that
> >>>>>>>     skipping the
> >>>>>>>     >>>  cleanup timers resolves the
> >>>>>>>     >>>  state leak that we
> >>>>>>>     observe in
> >>>>>>>     >>>  the pipeline that uses a
> >>>>>>>     >>>  global window.
> >>>>>>>     >>>
> >>>>>>>     >>>  @Luke the GC is key
> >>>>>>>     >>>  partitioned and relies on
> >>>>>>>     >>>  StateInternals. That makes it
> >>>>>>>     >>>  impractical to have a single
> >>>>>>>     >>>  timer that performs
> >>>>>>>     cleanup
> >>>>>>>     >>>  for multiple keys, at
> >>>>>>>     least
> >>>>>>>     >>>  in a runner agnostic way.
> >>>>>>>     >>>
> >>>>>>>     >>>    I would like to take a
> >>>>>>>     look
> >>>>>>>     >>>  if there is a need to have
> >>>>>>>     >>>  the GC timer for a
> >>>>>>>     >>>  global window to start with.
> >>>>>>>     >>>  Since the pipeline
> >>>>>>>     >>>  terminates, the
> >>>>>>>     >>>  runner discards all state
> >>>>>>>     >>>  anyways - at least in the
> >>>>>>>     >>>  case of Flink.
> >>>>>>>     >>>
> >>>>>>>     >>>  Thomas
> >>>>>>>     >>>
> >>>>>>>     >>>  On Mon, Aug 17, 2020
> >>>>>>>     at 9:46
> >>>>>>>     >>>  AM Luke Cwik
> >>>>>>>     >>>  <lc...@google.com
> >>>>>>>     <mailto:lc...@google.com>
> >>>>>>>     >>>  <mailto:lc...@google.com <mailto:lc...@google.com>>> 
> >>>>>>> wrote:
> >>>>>>>     >>>
> >>>>>>>     >>>  For the cleanup timer.
> >>>>>>>     >>>
> >>>>>>>     >>>  On Mon, Aug 17,
> >>>>>>>     2020 at
> >>>>>>>     >>>  9:45 AM Luke Cwik
> >>>>>>>     >>>  <lc...@google.com <mailto:lc...@google.com>
> >>>>>>>     >>>  <mailto:lc...@google.com <mailto:lc...@google.com>>> 
> >>>>>>> wrote:
> >>>>>>>     >>>
> >>>>>>>     >>>  Replacing a timer for
> >>>>>>>     >>>  each key with just
> >>>>>>>     >>>  one timer for all
> >>>>>>>     >>>  keys would make sense
> >>>>>>>     >>>  for the global window.
> >>>>>>>     >>>
> >>>>>>>     >>>  On Sun, Aug 16, 2020
> >>>>>>>     >>>  at 5:54 PM Thomas
> >>>>>>>     >>>  Weise <t...@apache.org <mailto:t...@apache.org>
> >>>>>>>     >>>  <mailto:t...@apache.org <mailto:t...@apache.org>>>
> >>>>>>>     >>>  wrote:
> >>>>>>>     >>>
> >>>>>>>     >>>      Thanks Jan. We
> >>>>>>>     >>>      observe a similar
> >>>>>>>     >>>      issue with state
> >>>>>>>     >>>      size growth in
> >>>>>>>     >>>      global window
> >>>>>>>     >>>      (with the
> >>>>>>>     >>>      portable runner).
> >>>>>>>     >>>      We don't see this
> >>>>>>>     >>>      issue
> >>>>>>>     >>>      with non-global
> >>>>>>>     >>>      windows,
> >>>>>>>     >>>      there does not
> >>>>>>>     >>>      appear to be any
> >>>>>>>     >>>      residual. I will
> >>>>>>>     >>>      take a look at
> >>>>>>>     >>>      skipping the
> >>>>>>>     >>>      cleanup timers
> >>>>>>>     >>>      for global
> >>>>>>>     >>>      window and see if
> >>>>>>>     >>>      that resolves the
> >>>>>>>     >>>      issue. These
> >>>>>>>     >>>      timers lead to
> >>>>>>>     >>>      potentially
> >>>>>>>     >>>      unbounded state
> >>>>>>>     >>>      growth and don't
> >>>>>>>     >>>      really serve a
> >>>>>>>     >>>      purpose.
> >>>>>>>     >>>
> >>>>>>>     >>>      Thomas
> >>>>>>>     >>>
> >>>>>>>     >>>      On Sun, Aug 16,
> >>>>>>>     >>>      2020 at 1:16 AM
> >>>>>>>     >>>      Jan Lukavský
> >>>>>>>     >>>      <je...@seznam.cz <mailto:je...@seznam.cz>
> >>>>>>>     >>>      <mailto:je...@seznam.cz <mailto:je...@seznam.cz>>>
> >>>>>>>     >>>      wrote:
> >>>>>>>     >>>
> >>>>>>>     >>>          Hi Catlyn,
> >>>>>>>     >>>
> >>>>>>>     >>>          if you use
> >>>>>>>     >>>          global window
> >>>>>>>     >>>          to perform
> >>>>>>>     >>>          the
> >>>>>>>     >>>          deduplication, then
> >>>>>>>     >>>          it should be
> >>>>>>>     >>>          expected to
> >>>>>>>     >>>          have as many
> >>>>>>>     >>>          timers as
> >>>>>>>     >>>          there are
> >>>>>>>     >>>          unique keys +
> >>>>>>>     >>>          one timer for
> >>>>>>>     >>>          each key that
> >>>>>>>     >>>          arrived
> >>>>>>>     >>>          during the
> >>>>>>>     >>>          last 30
> >>>>>>>     >>>          minutes
> >>>>>>>     >>>          (because
> >>>>>>>     >>>          there is
> >>>>>>>     >>>          timer set to
> >>>>>>>     >>>          clear the
> >>>>>>>     >>>          state in the
> >>>>>>>     >>>          deduplication
> >>>>>>>     >>>          function).
> >>>>>>>     >>>          The reason
> >>>>>>>     >>>          for that is
> >>>>>>>     >>>          that Beam
> >>>>>>>     >>>          creates timer
> >>>>>>>     >>>          for window
> >>>>>>>     >>>          garbage
> >>>>>>>     >>>          collection
> >>>>>>>     >>>          time to clear
> >>>>>>>     >>>          state (see
> >>>>>>>     >>>          [1]). If it
> >>>>>>>     >>>          is global
> >>>>>>>     >>>          window, then
> >>>>>>>     >>>          each key will
> >>>>>>>     >>>          have
> >>>>>>>     >>>          associated
> >>>>>>>     >>>          timer forever
> >>>>>>>     >>>          (it might
> >>>>>>>     >>>          open question
> >>>>>>>     >>>          if it makes
> >>>>>>>     >>>          sense in this
> >>>>>>>     >>>          case, or if
> >>>>>>>     >>>          Beam can do
> >>>>>>>     >>>          any better).
> >>>>>>>     >>>
> >>>>>>>     >>>          As I wrote
> >>>>>>>     >>>          before, it
> >>>>>>>     >>>          would
> >>>>>>>     >>>          probably help
> >>>>>>>     >>>          to use two
> >>>>>>>     >>>          deduplications in
> >>>>>>>     >>>          two
> >>>>>>>     >>>          successive
> >>>>>>>     >>>          fixed windows
> >>>>>>>     >>>          of length 30
> >>>>>>>     >>>          minutes,
> >>>>>>>     >>>          shifted by 15
> >>>>>>>     >>>          minutes
> >>>>>>>     >>>          (FixedWindows.of(30
> >>>>>>>     >>>          minutes).withOffset(15
> >>>>>>>     >>>          minutes)), so
> >>>>>>>     >>>          that the two
> >>>>>>>     >>>          windows
> >>>>>>>     >>>          overlap and
> >>>>>>>     >>>          catch
> >>>>>>>     >>>          duplicates
> >>>>>>>     >>>          that would
> >>>>>>>     >>>          appear near
> >>>>>>>     >>>          boundary of
> >>>>>>>     >>>          the first window.
> >>>>>>>     >>>
> >>>>>>>     >>>          @Max, do you
> >>>>>>>     >>>          think it
> >>>>>>>     >>>          would be
> >>>>>>>     >>>          possible to
> >>>>>>>     >>>          schedule the
> >>>>>>>     >>>          cleanup timer
> >>>>>>>     >>>          only when
> >>>>>>>     >>>          there is
> >>>>>>>     >>>          actually data
> >>>>>>>     >>>          in state for
> >>>>>>>     >>>          given key?
> >>>>>>>     >>>          The timer
> >>>>>>>     >>>          would be
> >>>>>>>     >>>          cleared on
> >>>>>>>     >>>          call to
> >>>>>>>     >>>          `clear()`,
> >>>>>>>     >>>          but would
> >>>>>>>     >>>          have to be
> >>>>>>>     >>>          set on every
> >>>>>>>     >>>          write. Or
> >>>>>>>     >>>          would it make
> >>>>>>>     >>>          sense not to
> >>>>>>>     >>>          schedule the
> >>>>>>>     >>>          cleanup timer
> >>>>>>>     >>>          for global
> >>>>>>>     >>>          window at all?
> >>>>>>>     >>>
> >>>>>>>     >>>          Jan
> >>>>>>>     >>>
> >>>>>>>     >>>          [1]
> >>>>>>>     >>>
> >>>>>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L334
> >>>>>>>  
> >>>>>>>
> >>>>>>>     >>>
> >>>>>>>     >>>          On 8/15/20
> >>>>>>>     >>>          5:47 PM,
> >>>>>>>     >>>          Catlyn Kong
> >>>>>>>     >>>          wrote:
> >>>>>>>     >>>>              Hi!
> >>>>>>>     >>>>
> >>>>>>>     >>>>              Thanks for
> >>>>>>>     >>>>              the
> >>>>>>>     >>>>              explanation!
> >>>>>>>     >>>>              The
> >>>>>>>     >>>>              screenshot
> >>>>>>>     >>>>              actually
> >>>>>>>     >>>>     ��        shows all
> >>>>>>>     >>>>              the new
> >>>>>>>     >>>>              instances
> >>>>>>>     >>>>              between
> >>>>>>>     >>>>              marking the
> >>>>>>>     >>>>              heap and
> >>>>>>>     >>>>              taking a
> >>>>>>>     >>>>              heap dump,
> >>>>>>>     >>>>              so sorry if
> >>>>>>>     >>>>              that's a
> >>>>>>>     >>>>              little
> >>>>>>>     >>>>              confusing.
> >>>>>>>     >>>>              Here's what
> >>>>>>>     >>>>              the full
> >>>>>>>     >>>>              heap looks like:
> >>>>>>>     >>>>              Screen Shot
> >>>>>>>     >>>>              2020-08-15
> >>>>>>>     >>>>              at 8.31.42
> >>>>>>>     >>>>              AM.png
> >>>>>>>     >>>>              Our input
> >>>>>>>     >>>>              stream has
> >>>>>>>     >>>>              roughly 50
> >>>>>>>     >>>>              messages per
> >>>>>>>     >>>>              second and
> >>>>>>>     >>>>              the pipeline
> >>>>>>>     >>>>              has been
> >>>>>>>     >>>>              running for
> >>>>>>>     >>>>              about 24
> >>>>>>>     >>>>              hours. Even
> >>>>>>>     >>>>              assuming all
> >>>>>>>     >>>>              the messages
> >>>>>>>     >>>>              are unique,
> >>>>>>>     >>>>              5.5 million
> >>>>>>>     >>>>              timers is
> >>>>>>>     >>>>              still very
> >>>>>>>     >>>>              surprising.
> >>>>>>>     >>>>
> >>>>>>>     >>>>              We're
> >>>>>>>     >>>>              allocating
> >>>>>>>     >>>>              11G for
> >>>>>>>     >>>>              taskmanager JVM
> >>>>>>>     >>>>              heap, but it
> >>>>>>>     >>>>              eventually
> >>>>>>>     >>>>              gets filled
> >>>>>>>     >>>>              up (after
> >>>>>>>     >>>>              couple days)
> >>>>>>>     >>>>              and the
> >>>>>>>     >>>>              cluster ends
> >>>>>>>     >>>>              up in a bad
> >>>>>>>     >>>>              state.
> >>>>>>>     >>>>              Here's a
> >>>>>>>     >>>>              screenshot
> >>>>>>>     >>>>              of the heap
> >>>>>>>     >>>>              size over
> >>>>>>>     >>>>              the past 24h:
> >>>>>>>     >>>>              Screen Shot
> >>>>>>>     >>>>              2020-08-15
> >>>>>>>     >>>>              at 8.41.48
> >>>>>>>     >>>>              AM.png
> >>>>>>>     >>>>
> >>>>>>>     >>>>              Could it be
> >>>>>>>     >>>>              that the
> >>>>>>>     >>>>              timers never
> >>>>>>>     >>>>              got clear
> >>>>>>>     >>>>              out or maybe
> >>>>>>>     >>>>              the pipeline
> >>>>>>>     >>>>              is creating
> >>>>>>>     >>>>              more
> >>>>>>>     >>>>              timer instances
> >>>>>>>     >>>>              than expected?
> >>>>>>>     >>>>
> >>>>>>>     >>>>              On Sat, Aug
> >>>>>>>     >>>>              15, 2020 at
> >>>>>>>     >>>>              4:07 AM
> >>>>>>>     >>>>              Maximilian
> >>>>>>>     >>>>              Michels
> >>>>>>>     >>>>              <m...@apache.org <mailto:m...@apache.org>
> >>>>>>>     >>>>  <mailto:m...@apache.org <mailto:m...@apache.org>>>
> >>>>>>>     >>>>              wrote:
> >>>>>>>     >>>>
> >>>>>>>     >>>>                  Awesome!
> >>>>>>>     >>>>                  Thanks a
> >>>>>>>     >>>>                  lot for
> >>>>>>>     >>>>                  the
> >>>>>>>     >>>>                  memory
> >>>>>>>     >>>>                  profile.
> >>>>>>>     >>>>                  Couple
> >>>>>>>     >>>>                  remarks:
> >>>>>>>     >>>>
> >>>>>>>     >>>>                  a) I can
> >>>>>>>     >>>>                  see that
> >>>>>>>     >>>>                  there
> >>>>>>>     >>>>                  are
> >>>>>>>     >>>>                  about
> >>>>>>>     >>>>                  378k
> >>>>>>>     >>>>                  keys and
> >>>>>>>     >>>>                  each of
> >>>>>>>     >>>>                  them
> >>>>>>>     >>>>                  sets a
> >>>>>>>     >>>>                  timer.
> >>>>>>>     >>>>            ��                  b) Based
> >>>>>>>     >>>>                  on the
> >>>>>>>     >>>>                  settings
> >>>>>>>     >>>>                  for
> >>>>>>>     >>>>  DeduplicatePerKey
> >>>>>>>     >>>>                  you
> >>>>>>>     >>>>                  posted,
> >>>>>>>     >>>>                  you will
> >>>>>>>     >>>>                  keep
> >>>>>>>     >>>>                  track of
> >>>>>>>     >>>>                  all keys
> >>>>>>>     >>>>                  of the
> >>>>>>>     >>>>                  last 30
> >>>>>>>     >>>>                  minutes.
> >>>>>>>     >>>>
> >>>>>>>     >>>>                  Unless
> >>>>>>>     >>>>                  you have
> >>>>>>>     >>>>                  much
> >>>>>>>     >>>>                  fewer
> >>>>>>>     >>>>                  keys,
> >>>>>>>     >>>>                  the
> >>>>>>>     >>>>                  behavior
> >>>>>>>     >>>>                  is to be
> >>>>>>>     >>>>                  expected. The
> >>>>>>>     >>>>
> >>>>>>>     >>>>                  memory
> >>>>>>>     >>>>                  sizes
> >>>>>>>     >>>>                  for the
> >>>>>>>     >>>>                  timer
> >>>>>>>     >>>>                  maps do
> >>>>>>>     >>>>                  not look
> >>>>>>>     >>>>                  particularly
> >>>>>>>     >>>>                  high
> >>>>>>>     >>>>                  (~12Mb).
> >>>>>>>     >>>>
> >>>>>>>     >>>>                  How much
> >>>>>>>     >>>>                  memory
> >>>>>>>     >>>>                  did you
> >>>>>>>     >>>>                  reserve
> >>>>>>>     >>>>                  for the
> >>>>>>>     >>>>                  task
> >>>>>>>     >>>>                  managers?*
> >>>>>>>     >>>>
> >>>>>>>     >>>>                  -Max
> >>>>>>>     >>>>
> >>>>>>>     >>>>                  *The
> >>>>>>>     >>>>                  image
> >>>>>>>     >>>>                  links
> >>>>>>>     >>>>                  give me
> >>>>>>>     >>>>                  a "504
> >>>>>>>     >>>>                  error".
> >>>>>>>     >>>>
> >>>>>>>     >>>>                  On
> >>>>>>>     >>>>                  14.08.20
> >>>>>>>     >>>>                  23:29,
> >>>>>>>     >>>>                  Catlyn
> >>>>>>>     >>>>                  Kong wrote:
> >>>>>>>     >>>>                  > Hi!
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                  > We're
> >>>>>>>     >>>>                  indeed
> >>>>>>>     >>>>                  using
> >>>>>>>     >>>>                  the
> >>>>>>>     >>>>                  rocksdb
> >>>>>>>     >>>>                  state
> >>>>>>>     >>>>                  backend,
> >>>>>>>     >>>>                  so that
> >>>>>>>     >>>>                  might be
> >>>>>>>     >>>>                  part of
> >>>>>>>     >>>>                  > the
> >>>>>>>     >>>>                  reason.
> >>>>>>>     >>>>                  Due to
> >>>>>>>     >>>>                  some
> >>>>>>>     >>>>                  security
> >>>>>>>     >>>>                  concerns, we
> >>>>>>>     >>>>                  might
> >>>>>>>     >>>>                  not be
> >>>>>>>     >>>>                  able to
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                  provide
> >>>>>>>     >>>>                  the full
> >>>>>>>     >>>>                  heap
> >>>>>>>     >>>>                  dump
> >>>>>>>     >>>>                  since we
> >>>>>>>     >>>>                  have
> >>>>>>>     >>>>                  some
> >>>>>>>     >>>>                  custom
> >>>>>>>     >>>>                  code
> >>>>>>>     >>>>                  path. But
> >>>>>>>     >>>>                  > here's
> >>>>>>>     >>>>                  a
> >>>>>>>     >>>>                  screenshot
> >>>>>>>     >>>>                  from
> >>>>>>>     >>>>                  JProfiler:
> >>>>>>>     >>>>                  > Screen
> >>>>>>>     >>>>                  Shot
> >>>>>>>     >>>>                  2020-08-14
> >>>>>>>     >>>>                  at
> >>>>>>>     >>>>                  9.10.07
> >>>>>>>     >>>>                  AM.png
> >>>>>>>     >>>>                  > Looks
> >>>>>>>     >>>>                  like
> >>>>>>>     >>>>  TimerHeapInternalTimer
> >>>>>>>     >>>>                  (initiated
> >>>>>>>     >>>>                  in
> >>>>>>>     >>>>  InternalTimerServiceImpl
> >>>>>>>     >>>>
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>> 
> >>>>>>>  
> >>>>>>> <https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L46>)
> >>>>>>>  
> >>>>>>>
> >>>>>>>     >>>>
> >>>>>>>     >>>>                  > isn't
> >>>>>>>     >>>>                  getting
> >>>>>>>     >>>>                  garbage
> >>>>>>>     >>>>                  collected?
> >>>>>>>     >>>>                  As David
> >>>>>>>     >>>>                  has
> >>>>>>>     >>>>                  mentioned the
> >>>>>>>     >>>>                  pipeline
> >>>>>>>     >>>>                  > uses
> >>>>>>>     >>>>  DeduplicatePerKey
> >>>>>>>     >>>>
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>> 
> >>>>>>>  
> >>>>>>> <https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>
> >>>>>>>  in 
> >>>>>>>
> >>>>>>>     >>>>
> >>>>>>>     >>>>                  > Beam
> >>>>>>>     >>>>                  2.22,
> >>>>>>>     >>>>  ProcessConnectionEventFn
> >>>>>>>     >>>>                  is a
> >>>>>>>     >>>>                  simple stateless
> >>>>>>>     >>>>                  DoFn
> >>>>>>>     >>>>                  that just
> >>>>>>>     >>>>                  > does
> >>>>>>>     >>>>                  some
> >>>>>>>     >>>>                  logging
> >>>>>>>     >>>>                  and
> >>>>>>>     >>>>                  emits
> >>>>>>>     >>>>                  the
> >>>>>>>     >>>>                  events.
> >>>>>>>     >>>>                  Is there
> >>>>>>>     >>>>                  any
> >>>>>>>     >>>>                  possibility
> >>>>>>>     >>>>                  that
> >>>>>>>     >>>>                  > the
> >>>>>>>     >>>>                  timer
> >>>>>>>     >>>>                  logic or
> >>>>>>>     >>>>                  the way
> >>>>>>>     >>>>                  it's
> >>>>>>>     >>>>                  used in
> >>>>>>>     >>>>                  the
> >>>>>>>     >>>>                  dedupe
> >>>>>>>     >>>>                  Pardo
> >>>>>>>     >>>>                  can
> >>>>>>>     >>>>                  cause this
> >>>>>>>     >>>>                  > leak?
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                  > Thanks,
> >>>>>>>     >>>>                  > Catlyn
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                  > On
> >>>>>>>     >>>>                  Tue, Aug
> >>>>>>>     >>>>                  11, 2020
> >>>>>>>     >>>>                  at 7:58
> >>>>>>>     >>>>                  AM
> >>>>>>>     >>>>                  Maximilian
> >>>>>>>     >>>>                  Michels
> >>>>>>>     >>>>  <m...@apache.org <mailto:m...@apache.org>
> >>>>>>>     >>>>  <mailto:m...@apache.org <mailto:m...@apache.org>>
> >>>>>>>     >>>>
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>  <mailto:m...@apache.org <mailto:m...@apache.org>
> >>>>>>>     >>>>  <mailto:m...@apache.org
> >>>>>>>     <mailto:m...@apache.org>>>>
> >>>>>>>     >>>>                  wrote:
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                  >     Hi!
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                   Looks
> >>>>>>>     >>>>                  like a
> >>>>>>>     >>>>                  potential leak,
> >>>>>>>     >>>>                  caused
> >>>>>>>     >>>>                  by your
> >>>>>>>     >>>>                  code or
> >>>>>>>     >>>>                  by Beam
> >>>>>>>     >>>>                  itself.
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                   Would
> >>>>>>>     >>>>                  you be
> >>>>>>>     >>>>                  able to
> >>>>>>>     >>>>                  supply a
> >>>>>>>     >>>>                  heap
> >>>>>>>     >>>>                  dump
> >>>>>>>     >>>>                  from one
> >>>>>>>     >>>>                  of the
> >>>>>>>     >>>>                  task
> >>>>>>>     >>>>                  managers?
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                   That
> >>>>>>>     >>>>                  would
> >>>>>>>     >>>>                  greatly
> >>>>>>>     >>>>                  help
> >>>>>>>     >>>>                  debugging this
> >>>>>>>     >>>>                  issue.
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                  >     -Max
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                  >     On
> >>>>>>>     >>>>                  07.08.20
> >>>>>>>     >>>>                  00:19,
> >>>>>>>     >>>>                  David
> >>>>>>>     >>>>                  Gogokhiya wrote:
> >>>>>>>     >>>>                  >      > Hi,
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  We
> >>>>>>>     >>>>                  recently
> >>>>>>>     >>>>                  started
> >>>>>>>     >>>>                  using
> >>>>>>>     >>>>                  Apache
> >>>>>>>     >>>>                  Beam
> >>>>>>>     >>>>                  version
> >>>>>>>     >>>>                  2.20.0
> >>>>>>>     >>>>                  running on
> >>>>>>>     >>>>                  >     Flink
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  version
> >>>>>>>     >>>>                  1.9
> >>>>>>>     >>>>                  deployed
> >>>>>>>     >>>>                  on
> >>>>>>>     >>>>                  kubernetes
> >>>>>>>     >>>>                  to
> >>>>>>>     >>>>                  process
> >>>>>>>     >>>>                  unbounded streams
> >>>>>>>     >>>>                  >     of
> >>>>>>>     >>>>                  data.
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  However,
> >>>>>>>     >>>>                  we
> >>>>>>>     >>>>                  noticed
> >>>>>>>     >>>>                  that the
> >>>>>>>     >>>>                  memory
> >>>>>>>     >>>>                  consumed
> >>>>>>>     >>>>                  by
> >>>>>>>     >>>>                  stateful
> >>>>>>>     >>>>                  Beam is
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  steadily
> >>>>>>>     >>>>                  increasing
> >>>>>>>     >>>>                  over
> >>>>>>>     >>>>                  time
> >>>>>>>     >>>>                  with no
> >>>>>>>     >>>>                  drops no
> >>>>>>>     >>>>                  matter
> >>>>>>>     >>>>                  what the
> >>>>>>>     >>>>                  >  current
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  bandwidth is.
> >>>>>>>     >>>>                  We were
> >>>>>>>     >>>>                  wondering if
> >>>>>>>     >>>>                  this is
> >>>>>>>     >>>>                  expected
> >>>>>>>     >>>>                  and if
> >>>>>>>     >>>>                  not what
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  would be
> >>>>>>>     >>>>                  the best
> >>>>>>>     >>>>                  way to
> >>>>>>>     >>>>                  resolve it.
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  > >
> >>>>>>>     >>>>                   More
> >>>>>>>     >>>>                  Context
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  We have
> >>>>>>>     >>>>                  the
> >>>>>>>     >>>>                  following pipeline
> >>>>>>>     >>>>                  that
> >>>>>>>     >>>>                  consumes
> >>>>>>>     >>>>                  messages
> >>>>>>>     >>>>                  from the
> >>>>>>>     >>>>                  >  unbounded
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  stream
> >>>>>>>     >>>>                  of data.
> >>>>>>>     >>>>                  Later we
> >>>>>>>     >>>>                  deduplicate
> >>>>>>>     >>>>                  the
> >>>>>>>     >>>>                  messages
> >>>>>>>     >>>>                  based on
> >>>>>>>     >>>>                  unique
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  message
> >>>>>>>     >>>>                  id using
> >>>>>>>     >>>>                  the
> >>>>>>>     >>>>                  deduplicate
> >>>>>>>     >>>>                  function
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>> 
> >>>>>>>   
> >>>>>>> <https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>.
> >>>>>>>  
> >>>>>>>
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  Since we
> >>>>>>>     >>>>                  are
> >>>>>>>     >>>>                  using
> >>>>>>>     >>>>                  Beam
> >>>>>>>     >>>>                  version
> >>>>>>>     >>>>                  2.20.0,
> >>>>>>>     >>>>                  we
> >>>>>>>     >>>>                  copied
> >>>>>>>     >>>>                  the
> >>>>>>>     >>>>                  source code
> >>>>>>>     >>>>                  >     of the
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  deduplicate
> >>>>>>>     >>>>                  function
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>> 
> >>>>>>>   
> >>>>>>> <https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>from
> >>>>>>>  
> >>>>>>>
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  version
> >>>>>>>     >>>>                  2.22.0.
> >>>>>>>     >>>>                  After
> >>>>>>>     >>>>                  that we
> >>>>>>>     >>>>                  unmap
> >>>>>>>     >>>>                  the
> >>>>>>>     >>>>                  tuple,
> >>>>>>>     >>>>                  retrieve the
> >>>>>>>     >>>>                  >  necessary
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  data
> >>>>>>>     >>>>                  from
> >>>>>>>     >>>>                  message
> >>>>>>>     >>>>                  payload
> >>>>>>>     >>>>                  and dump
> >>>>>>>     >>>>                  the
> >>>>>>>     >>>>                  corresponding
> >>>>>>>     >>>>                  data into
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                   the log.
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  Pipeline:
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  Flink
> >>>>>>>     >>>>                  configuration:
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  As we
> >>>>>>>     >>>>                  mentioned before,
> >>>>>>>     >>>>                  we
> >>>>>>>     >>>>                  noticed
> >>>>>>>     >>>>                  that the
> >>>>>>>     >>>>                  memory
> >>>>>>>     >>>>                  usage of the
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  jobmanager
> >>>>>>>     >>>>                  and
> >>>>>>>     >>>>                  taskmanager
> >>>>>>>     >>>>                  pod are
> >>>>>>>     >>>>                  steadily
> >>>>>>>     >>>>                  increasing
> >>>>>>>     >>>>                  with no
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                   drops no
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  matter
> >>>>>>>     >>>>                  what the
> >>>>>>>     >>>>                  current
> >>>>>>>     >>>>                  bandwidth is.
> >>>>>>>     >>>>                  We tried
> >>>>>>>     >>>>                  allocating
> >>>>>>>     >>>>                  more
> >>>>>>>     >>>>                  >  memory
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  but it
> >>>>>>>     >>>>                  seems
> >>>>>>>     >>>>                  like no
> >>>>>>>     >>>>                  matter
> >>>>>>>     >>>>                  how much
> >>>>>>>     >>>>                  memory
> >>>>>>>     >>>>                  we
> >>>>>>>     >>>>                  allocate it
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>                   eventually
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  reaches
> >>>>>>>     >>>>                  its
> >>>>>>>     >>>>                  limit
> >>>>>>>     >>>>                  and then
> >>>>>>>     >>>>                  it tries
> >>>>>>>     >>>>                  to
> >>>>>>>     >>>>                  restart
> >>>>>>>     >>>>                  itself.
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  Sincerely,
> >>>>>>>     >>>>                  David
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >      >
> >>>>>>>     >>>>                  >
> >>>>>>>     >>>>
> >>>>>>>
> 

Reply via email to