I have a very naive question. I know Jan suggested to use 2 successive fixed overlapping windows with offset as a temporary solution to dedup the events. However, I am wondering whether using a single fixed window of length let's say 1 day followed by a deduplicate function is a good alternative? I assume that at the end of the window all the timers will be cleared which will result in missing some of the duplicates but I am ok with that.
My pipeline looks something like the following: https://pasteboard.co/JoWL0HP.png It seems to be working when I tested it but I wanted to double check especially considering the the following statement taken from the Beam documentation (https://beam.apache.org/documentation/programming-guide/#windowing): "If you set a windowing function using the Window transform, each element is assigned to a window, but the windows are not considered until GroupByKey or Combine aggregates across a window and key. " P.S. this is my 5th attempt to post a reply. I hope this reply will be posted...not sure why my prev emails didn't make it through though On 2020/08/27 10:28:48, Jan Lukavský <je...@seznam.cz> wrote: > > If the user chooses to create a window of 10 years, I'd say it is > expected behavior that the state will be kept for as long as this duration. > > State will be kept, the problem is that each key in the window will > carry a cleanup timer, although there might be nothing to clear (there > is no state to be kept). This suboptimality is really related only to > these cases and there is nothing special about global windows in there. > It is only about that other large windowfns are really rare, but that is > a coincidence, not a cause. > > Nevertheless, I'm fine with your proposed solution, we might extend it > in the future, if we find it useful. :) > > Jan > > On 8/27/20 12:06 PM, Maximilian Michels wrote: > > If the user chooses to create a window of 10 years, I'd say it is > > expected behavior that the state will be kept for as long as this > > duration. > > > > GlobalWindows are different because they represent the default case > > where the user does not even use windowing. I think it warrants to be > > treated differently, especially because cleanup simply cannot be > > ensured by the watermark. > > > > It would be possible to combine both approaches, but I'd rather not > > skip the cleanup timer for non-global windows because that could > > easily become the source of another leak. The more pressing issue here > > is the global window, not specific windowing. > > > > -Max > > > > On 26.08.20 10:15, Jan Lukavský wrote: > >> Window triggering is afaik operation that is specific to GBK. > >> Stateful DoFns can have (as shown in the case of deduplication) > >> timers set for the GC only, triggering has no effect there. And yes, > >> if we have other timers than GC (any user timers), then we have to > >> have GC timer (because timers are a form of state). > >> > >> Imagine a (admittedly artificial) example of deduplication in fixed > >> window of 10 years. It would exhibit exactly the same state growth as > >> global window (and 10 years is "almost infinite", right? :)). > >> > >> Jan > >> > >> On 8/26/20 10:01 AM, Maximilian Michels wrote: > >>>> The inefficiency described happens if and only if the following two > >>>> conditions are met: > >>>> > >>>> a) there are many timers per single window (as otherwise they will > >>>> be negligible) > >>>> > >>>> b) there are many keys which actually contain no state (as > >>>> otherwise the timer would be negligible wrt the state size) > >>> > >>> Each window has to have a timer set, it is unavoidable for the > >>> window computation to be triggered accordingly. This happens > >>> regardless of whether we have state associated with the key/window > >>> or not. The additional cleanup timer is just a side effect and not a > >>> concern in my opinion. Since window computation is per-key, there is > >>> no way around this. I don't think skipping the cleanup timer for non > >>> global windows without state is a good idea, just to save one > >>> cleanup timer, when there are already timers created for the window > >>> computation. > >>> > >>> Now, the global window is different in that respect because we can't > >>> assume it is going to be triggered for unbounded streams. Thus, it > >>> makes sense to me to handle it differently by not using triggers but > >>> cleaning up once a watermark > MAX_TIMESTAMP has been processed. > >>> > >>> -Max > >>> > >>> On 26.08.20 09:20, Jan Lukavský wrote: > >>>> On 8/25/20 9:27 PM, Maximilian Michels wrote: > >>>> > >>>>>> I agree that this probably solves the described issue in the most > >>>>>> straightforward way, but special handling for global window feels > >>>>>> weird, as there is really nothing special about global window wrt > >>>>>> state cleanup. > >>>>> > >>>>> Why is special handling for the global window weird? After all, it > >>>>> is a special case because the global window normally will only be > >>>>> cleaned up when the application terminates. > >>>> > >>>> The inefficiency described happens if and only if the following two > >>>> conditions are met: > >>>> > >>>> a) there are many timers per single window (as otherwise they > >>>> will be negligible) > >>>> > >>>> b) there are many keys which actually contain no state (as > >>>> otherwise the timer would be negligible wrt the state size) > >>>> > >>>> It only happens to be the case that global window is the (by far, > >>>> might be 98% cases) most common case that satisfies these two > >>>> conditions, but there are other cases as well (e.g. long lasting > >>>> fixed window). Discussed options 2) and 3) are systematic in the > >>>> sense that option 2) cancels property a) and option 3) property b). > >>>> Making use of correlation of global window with these two > >>>> conditions to solve the issue is of course possible, but a little > >>>> unsystematic and that's what feels 'weird'. :) > >>>> > >>>>> > >>>>>> It doesn't change anything wrt migration. The timers that were > >>>>>> already set remain and keep on contributing to the state size. > >>>>> > >>>>> That's ok, regular timers for non-global windows need to remain > >>>>> set and should be persisted. They will be redistributed when > >>>>> scaling up and down. > >>>>> > >>>>>> I'm not sure that's a "problem", rather an inefficiency. But we > >>>>>> could address it by deleting the timers where they are currently > >>>>>> set, as mentioned previously. > >>>>> > >>>>> I had imagined that we don't even set these timers for the global > >>>>> window. Thus, there is no need to clean them up. > >>>>> > >>>>> -Max > >>>>> > >>>>> On 25.08.20 09:43, Jan Lukavský wrote: > >>>>>> I agree that this probably solves the described issue in the most > >>>>>> straightforward way, but special handling for global window feels > >>>>>> weird, as there is really nothing special about global window wrt > >>>>>> state cleanup. A solution that handles all windows equally would > >>>>>> be semantically 'cleaner'. If I try to sum up: > >>>>>> > >>>>>> - option 3) seems best, provided that isEmpty() lookup is cheap > >>>>>> for every state backend (e.g. that we do not hit disk multiple > >>>>>> times), this option is the best for state size wrt timers in all > >>>>>> windows > >>>>>> > >>>>>> - option 2) works well for key-aligned windows, also reduces > >>>>>> state size in all windows > >>>>>> > >>>>>> - option "watermark timer" - solves issue, easily implemented, > >>>>>> but doesn't improve situation for non-global windows > >>>>>> > >>>>>> My conclusion would be - use watermark timer as hotfix, if we can > >>>>>> prove that isEmpty() would be cheap, then use option 3) as final > >>>>>> solution, otherwise use 2). > >>>>>> > >>>>>> WDYT? > >>>>>> > >>>>>> On 8/25/20 5:48 AM, Thomas Weise wrote: > >>>>>>> > >>>>>>> > >>>>>>> On Mon, Aug 24, 2020 at 1:50 PM Maximilian Michels > >>>>>>> <m...@apache.org <mailto:m...@apache.org>> wrote: > >>>>>>> > >>>>>>> I'd suggest a modified option (2) which does not use a timer to > >>>>>>> perform > >>>>>>> the cleanup (as mentioned, this will cause problems with > >>>>>>> migrating > >>>>>>> state). > >>>>>>> > >>>>>>> > >>>>>>> That's a great idea. It's essentially a mix of 1) and 2) for the > >>>>>>> global window only. > >>>>>>> > >>>>>>> It doesn't change anything wrt migration. The timers that > >>>>>>> were already set remain and keep on contributing to the state size. > >>>>>>> > >>>>>>> I'm not sure that's a "problem", rather an inefficiency. But we > >>>>>>> could address it by deleting the timers where they are currently > >>>>>>> set, as mentioned previously. > >>>>>>> > >>>>>>> > >>>>>>> Instead, whenever we receive a watermark which closes the > >>>>>>> global > >>>>>>> window, > >>>>>>> we enumerate all keys and cleanup the associated state. > >>>>>>> > >>>>>>> This is the cleanest and simplest option. > >>>>>>> > >>>>>>> -Max > >>>>>>> > >>>>>>> On 24.08.20 20:47, Thomas Weise wrote: > >>>>>>> > > >>>>>>> > On Mon, Aug 24, 2020 at 11:35 AM Jan Lukavský > >>>>>>> <je...@seznam.cz > >>>>>>> <mailto:je...@seznam.cz> > >>>>>>> > <mailto:je...@seznam.cz <mailto:je...@seznam.cz>>> wrote: > >>>>>>> > > >>>>>>> > > The most general solution would be 3), given it can be > >>>>>>> agnostic > >>>>>>> > to window types and does not assume extra runner > >>>>>>> capabilities. > >>>>>>> > > >>>>>>> > Agree, 2) is optimization to that. It might be > >>>>>>> questionable > >>>>>>> if this > >>>>>>> > is premature optimization, but generally querying > >>>>>>> multiple > >>>>>>> states > >>>>>>> > for each clear opeartion to any state might be > >>>>>>> prohibitive, > >>>>>>> mostly > >>>>>>> > when the state would be stored in external database > >>>>>>> (in case of > >>>>>>> > Flink that would be RocksDB). > >>>>>>> > > >>>>>>> > For the use case I'm looking at, we are using the heap state > >>>>>>> backend. I > >>>>>>> > have not checked the RocksDB, but would assume that > >>>>>>> incremental > >>>>>>> cost of > >>>>>>> > isEmpty() for other states under the same key is negligible? > >>>>>>> > > >>>>>>> > > 3) wouldn't require any state migration. > >>>>>>> > > >>>>>>> > Actually, it would, as we would (ideally) like to > >>>>>>> migrate users' > >>>>>>> > pipelines that already contain timers for the end of > >>>>>>> global > >>>>>>> window, > >>>>>>> > which might not expire ever. > >>>>>>> > > >>>>>>> > Good catch. This could potentially be addressed by > >>>>>>> upgrading the > >>>>>>> timer > >>>>>>> > in the per record path. > >>>>>>> > > >>>>>>> > On 8/24/20 7:44 PM, Thomas Weise wrote: > >>>>>>> >> > >>>>>>> >> On Fri, Aug 21, 2020 at 12:32 AM Jan Lukavský > >>>>>>> <je...@seznam.cz <mailto:je...@seznam.cz> > >>>>>>> >> <mailto:je...@seznam.cz <mailto:je...@seznam.cz>>> > >>>>>>> wrote: > >>>>>>> >> > >>>>>>> >> If there are runners, that are unable to efficiently > >>>>>>> enumerate > >>>>>>> >> keys in state, then there probably isn't a runner > >>>>>>> agnostic > >>>>>>> >> solution to this. If we focus on Flink, we can > >>>>>>> provide > >>>>>>> >> specific implementation of CleanupTimer, which might > >>>>>>> then do > >>>>>>> >> anything from the mentioned options. I'd be +1 for > >>>>>>> option 2) > >>>>>>> >> for key-aligned windows (all currently supported) > >>>>>>> and > >>>>>>> option > >>>>>>> >> 3) for unaligned windows in the future. > >>>>>>> >> > >>>>>>> >> The most general solution would be 3), given it can be > >>>>>>> agnostic to > >>>>>>> >> window types and does not assume extra runner > >>>>>>> capabilities. It > >>>>>>> >> would require to introspect all user states for a > >>>>>>> given key on > >>>>>>> >> state.clear. That assumes as efficient implementation of > >>>>>>> >> isEmpty(). If all states are empty (have been > >>>>>>> cleared), then we > >>>>>>> >> can remove the cleanup timer. And add it back on > >>>>>>> state.add. I'm > >>>>>>> >> planning to give that a shot (for > >>>>>>> Flink/portable/streaming) > >>>>>>> to see > >>>>>>> >> how it performs. > >>>>>>> >> > >>>>>>> >> We should also consider how we migrate users from > >>>>>>> the > >>>>>>> current > >>>>>>> >> state to any future implementation. In case of > >>>>>>> option 2) it > >>>>>>> >> should be possible to do this when the state is > >>>>>>> loaded from > >>>>>>> >> savepoint, but I'm not 100% sure about that. > >>>>>>> >> > >>>>>>> >> 3) wouldn't require any state migration. > >>>>>>> >> > >>>>>>> >> Jan > >>>>>>> >> > >>>>>>> >> On 8/21/20 6:25 AM, Thomas Weise wrote: > >>>>>>> >>> Thanks for the clarification. > >>>>>>> >>> > >>>>>>> >>> Here are a few potential options to address the > >>>>>>> issue, > >>>>>>> based > >>>>>>> >>> on the discussion so far: > >>>>>>> >>> > >>>>>>> >>> 1) Optionally skip cleanup timer for global window > >>>>>>> >>> (user-controlled via pipeline option) > >>>>>>> >>> > >>>>>>> >>> 2) Instead of setting a cleanup timer for every > >>>>>>> key, > >>>>>>> handle > >>>>>>> >>> all keys for a given window with a single timer. > >>>>>>> This > >>>>>>> would > >>>>>>> >>> be runner specific and depend on if/how a given > >>>>>>> >>> runner supports key enumeration. Flink's keyed > >>>>>>> state > >>>>>>> backend > >>>>>>> >>> supports enumerating keys for a namespace (Beam > >>>>>>> window) and > >>>>>>> >>> state tag. [1] > >>>>>>> >>> > >>>>>>> >>> 3) Set the cleanup timer only when there is > >>>>>>> actually state > >>>>>>> >>> associated with a key. This could be > >>>>>>> accomplished by > >>>>>>> >>> intercepting append and clear in > >>>>>>> BagUserStateHandler > >>>>>>> [2] and > >>>>>>> >>> adding/removing the timer appropriately. > >>>>>>> >>> > >>>>>>> >>> 4) See if TTL support in the runner can is > >>>>>>> applicable, for > >>>>>>> >>> Flink see [3] > >>>>>>> >>> > >>>>>>> >>> [1] > >>>>>>> >>> > >>>>>>> https://github.com/apache/flink/blob/release-1.10/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java#L76 > >>>>>>> > >>>>>>> > >>>>>>> >>> > >>>>>>> >>> [2] > >>>>>>> >>> > >>>>>>> https://github.com/apache/beam/blob/release-2.23.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java#L315 > >>>>>>> > >>>>>>> > >>>>>>> >>> > >>>>>>> >>> [3] > >>>>>>> >>> > >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl > >>>>>>> > >>>>>>> > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> On Thu, Aug 20, 2020 at 8:08 AM Reuven Lax > >>>>>>> <re...@google.com <mailto:re...@google.com> > >>>>>>> >>> <mailto:re...@google.com > >>>>>>> <mailto:re...@google.com>>> > >>>>>>> wrote: > >>>>>>> >>> > >>>>>>> >>> Also +1 to what Jan said. Streaming > >>>>>>> pipelines can > >>>>>>> process > >>>>>>> >>> bounded PCollections on some paths, so the > >>>>>>> global > >>>>>>> window > >>>>>>> >>> will terminate for those paths. This is also > >>>>>>> true > >>>>>>> for the > >>>>>>> >>> direct runner tetsts where PCollections > >>>>>>> pretend to be > >>>>>>> >>> unbounded, but we then advance the watermark > >>>>>>> to +inf to > >>>>>>> >>> terminate the pipeline. > >>>>>>> >>> > >>>>>>> >>> On Thu, Aug 20, 2020 at 8:06 AM Reuven Lax > >>>>>>> >>> <re...@google.com <mailto:re...@google.com> > >>>>>>> <mailto:re...@google.com <mailto:re...@google.com>>> wrote: > >>>>>>> >>> > >>>>>>> >>> It is not Dataflow specific, but I think > >>>>>>> Dataflow is > >>>>>>> >>> the only runner that currently implements > >>>>>>> >>> > >>>>>>> > >>>>>>> Drain:https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit > >>>>>>> > >>>>>>> > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> When a pipeline is drained, all windows > >>>>>>> (including > >>>>>>> >>> global windows) end, and the windows > >>>>>>> processed > >>>>>>> (i.e. > >>>>>>> >>> as if they were fixed windows that > >>>>>>> terminated). > >>>>>>> >>> Currently the easiest way to ensure that > >>>>>>> is to > >>>>>>> rely > >>>>>>> >>> on the end-of-window timers for the > >>>>>>> global window > >>>>>>> >>> (alternatives are possible, like issuing a > >>>>>>> full-state > >>>>>>> >>> scan when a pipeline is drained, but > >>>>>>> that would be > >>>>>>> >>> quite a bit more complicated). This is not > >>>>>>> >>> specifically the GC timer, but rather the > >>>>>>> >>> end-of-window timer that is needed. > >>>>>>> >>> > >>>>>>> >>> I believe that right now we don't have a > >>>>>>> way of > >>>>>>> >>> deleting timers if there are no elements > >>>>>>> buffered for > >>>>>>> >>> a key (e.g. a key that received a few > >>>>>>> elements > >>>>>>> that > >>>>>>> >>> were processed in a trigger and then never > >>>>>>> received > >>>>>>> >>> any more elements). This might be part > >>>>>>> of the > >>>>>>> problem > >>>>>>> >>> - large numbers of empty keys with noop > >>>>>>> timers > >>>>>>> set. > >>>>>>> >>> It would be nice if there were a way to > >>>>>>> detect > >>>>>>> this > >>>>>>> >>> and at least remove the timers for those > >>>>>>> empty > >>>>>>> keys. > >>>>>>> >>> > >>>>>>> >>> Reuven > >>>>>>> >>> > >>>>>>> >>> On Wed, Aug 19, 2020 at 9:20 PM Thomas > >>>>>>> Weise > >>>>>>> >>> <t...@apache.org <mailto:t...@apache.org> > >>>>>>> <mailto:t...@apache.org <mailto:t...@apache.org>>> wrote: > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> On Wed, Aug 19, 2020 at 9:49 AM > >>>>>>> Reuven Lax > >>>>>>> >>> <re...@google.com > >>>>>>> <mailto:re...@google.com> <mailto:re...@google.com > >>>>>>> <mailto:re...@google.com>>> wrote: > >>>>>>> >>> > >>>>>>> >>> Skipping the cleanup timer for > >>>>>>> the global > >>>>>>> >>> window will break any sort of drain > >>>>>>> >>> functionality, which relies on having > >>>>>>> those > >>>>>>> >>> timers there. It's also > >>>>>>> necessary for > >>>>>>> bounded > >>>>>>> >>> inputs, for the same reason. > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> Can you say a bit more about why > >>>>>>> this will > >>>>>>> break > >>>>>>> >>> drain functionality and bounded inputs? Is > >>>>>>> this > >>>>>>> >>> Dataflow specific? Is it because the > >>>>>>> state > >>>>>>> would > >>>>>>> >>> be reused by a subsequent instance > >>>>>>> of the > >>>>>>> pipeline? > >>>>>>> >>> > >>>>>>> >>> For Flink, the GC timers would be > >>>>>>> triggered by > >>>>>>> >>> the final watermark and that will be > >>>>>>> the > >>>>>>> end of > >>>>>>> >>> the streaming job. Launching the > >>>>>>> same pipeline > >>>>>>> >>> again will either be a cold start > >>>>>>> with no > >>>>>>> >>> previous state or a start from > >>>>>>> savepoint/checkpoint. > >>>>>>> >>> > >>>>>>> >>> It sounds like for Dataflow there > >>>>>>> may be a > >>>>>>> need > >>>>>>> >>> for the user to influence the behavior > >>>>>>> while for > >>>>>>> >>> Flink the GC timers in a global > >>>>>>> window are not > >>>>>>> >>> required. > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> On Wed, Aug 19, 2020 at 10:31 AM > >>>>>>> Reuven Lax > >>>>>>> >>> <re...@google.com > >>>>>>> <mailto:re...@google.com> <mailto:re...@google.com > >>>>>>> <mailto:re...@google.com>>> wrote: > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> On Wed, Aug 19, 2020 at 9:53 AM > >>>>>>> Steve > >>>>>>> Niemitz > >>>>>>> >>> <sniem...@apache.org > >>>>>>> <mailto:sniem...@apache.org> > >>>>>>> >>> <mailto:sniem...@apache.org > >>>>>>> <mailto:sniem...@apache.org>>> wrote: > >>>>>>> >>> > >>>>>>> >>> for what it's worth, > >>>>>>> dataflow has the > >>>>>>> >>> same problem here as well. > >>>>>>> We've also > >>>>>>> >>> worked around it by > >>>>>>> (optionally) > >>>>>>> >>> disabling the cleanup timer > >>>>>>> in global > >>>>>>> >>> windows. But I agree, having > >>>>>>> drain then > >>>>>>> >>> be an unsafe operation is > >>>>>>> not great. > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> Dataflow does not require the > >>>>>>> timers > >>>>>>> to be in > >>>>>>> >>> memory though, so unless the > >>>>>>> numbers > >>>>>>> get very > >>>>>>> >>> large (to the point where you > >>>>>>> run out > >>>>>>> of disk > >>>>>>> >>> storage storing the timers), it > >>>>>>> will not > >>>>>>> >>> cause your pipelines to fail. > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> I think for batch it's less > >>>>>>> of an > >>>>>>> issue > >>>>>>> >>> since basically everything > >>>>>>> is in the > >>>>>>> >>> global window anyways, and > >>>>>>> batch > >>>>>>> >>> pipelines run for a fixed > >>>>>>> amount > >>>>>>> of time > >>>>>>> >>> on a fixed input source. For > >>>>>>> streaming > >>>>>>> >>> pipelines, it's much easier to run > >>>>>>> into > >>>>>>> >>> this. > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> On Wed, Aug 19, 2020 at > >>>>>>> 12:50 PM > >>>>>>> Reuven > >>>>>>> >>> Lax <re...@google.com > >>>>>>> <mailto:re...@google.com> > >>>>>>> >>> <mailto:re...@google.com > >>>>>>> <mailto:re...@google.com>>> wrote: > >>>>>>> >>> > >>>>>>> >>> @OnWindowExpiration is a per-key > >>>>>>> >>> callback. > >>>>>>> >>> > >>>>>>> >>> On Wed, Aug 19, 2020 at > >>>>>>> 9:48 > >>>>>>> AM Luke > >>>>>>> >>> Cwik <lc...@google.com > >>>>>>> <mailto:lc...@google.com> > >>>>>>> >>> <mailto:lc...@google.com > >>>>>>> <mailto:lc...@google.com>>> wrote: > >>>>>>> >>> > >>>>>>> >>> With the addition > >>>>>>> >>> of @OnWindowExpiration, a single > >>>>>>> >>> timer across keys optimization > >>>>>>> >>> would still make sense. > >>>>>>> >>> > >>>>>>> >>> On Wed, Aug 19, 2020 at > >>>>>>> 8:51 AM > >>>>>>> >>> Thomas Weise > >>>>>>> <t...@apache.org <mailto:t...@apache.org> > >>>>>>> >>> <mailto:t...@apache.org > >>>>>>> <mailto:t...@apache.org>>> wrote: > >>>>>>> >>> > >>>>>>> >>> https://issues.apache.org/jira/browse/BEAM-10760 > >>>>>>> >>> > >>>>>>> >>> I confirmed that > >>>>>>> skipping the > >>>>>>> >>> cleanup timers resolves the > >>>>>>> >>> state leak that we > >>>>>>> observe in > >>>>>>> >>> the pipeline that uses a > >>>>>>> >>> global window. > >>>>>>> >>> > >>>>>>> >>> @Luke the GC is key > >>>>>>> >>> partitioned and relies on > >>>>>>> >>> StateInternals. That makes it > >>>>>>> >>> impractical to have a single > >>>>>>> >>> timer that performs > >>>>>>> cleanup > >>>>>>> >>> for multiple keys, at > >>>>>>> least > >>>>>>> >>> in a runner agnostic way. > >>>>>>> >>> > >>>>>>> >>> I would like to take a > >>>>>>> look > >>>>>>> >>> if there is a need to have > >>>>>>> >>> the GC timer for a > >>>>>>> >>> global window to start with. > >>>>>>> >>> Since the pipeline > >>>>>>> >>> terminates, the > >>>>>>> >>> runner discards all state > >>>>>>> >>> anyways - at least in the > >>>>>>> >>> case of Flink. > >>>>>>> >>> > >>>>>>> >>> Thomas > >>>>>>> >>> > >>>>>>> >>> On Mon, Aug 17, 2020 > >>>>>>> at 9:46 > >>>>>>> >>> AM Luke Cwik > >>>>>>> >>> <lc...@google.com > >>>>>>> <mailto:lc...@google.com> > >>>>>>> >>> <mailto:lc...@google.com <mailto:lc...@google.com>>> > >>>>>>> wrote: > >>>>>>> >>> > >>>>>>> >>> For the cleanup timer. > >>>>>>> >>> > >>>>>>> >>> On Mon, Aug 17, > >>>>>>> 2020 at > >>>>>>> >>> 9:45 AM Luke Cwik > >>>>>>> >>> <lc...@google.com <mailto:lc...@google.com> > >>>>>>> >>> <mailto:lc...@google.com <mailto:lc...@google.com>>> > >>>>>>> wrote: > >>>>>>> >>> > >>>>>>> >>> Replacing a timer for > >>>>>>> >>> each key with just > >>>>>>> >>> one timer for all > >>>>>>> >>> keys would make sense > >>>>>>> >>> for the global window. > >>>>>>> >>> > >>>>>>> >>> On Sun, Aug 16, 2020 > >>>>>>> >>> at 5:54 PM Thomas > >>>>>>> >>> Weise <t...@apache.org <mailto:t...@apache.org> > >>>>>>> >>> <mailto:t...@apache.org <mailto:t...@apache.org>>> > >>>>>>> >>> wrote: > >>>>>>> >>> > >>>>>>> >>> Thanks Jan. We > >>>>>>> >>> observe a similar > >>>>>>> >>> issue with state > >>>>>>> >>> size growth in > >>>>>>> >>> global window > >>>>>>> >>> (with the > >>>>>>> >>> portable runner). > >>>>>>> >>> We don't see this > >>>>>>> >>> issue > >>>>>>> >>> with non-global > >>>>>>> >>> windows, > >>>>>>> >>> there does not > >>>>>>> >>> appear to be any > >>>>>>> >>> residual. I will > >>>>>>> >>> take a look at > >>>>>>> >>> skipping the > >>>>>>> >>> cleanup timers > >>>>>>> >>> for global > >>>>>>> >>> window and see if > >>>>>>> >>> that resolves the > >>>>>>> >>> issue. These > >>>>>>> >>> timers lead to > >>>>>>> >>> potentially > >>>>>>> >>> unbounded state > >>>>>>> >>> growth and don't > >>>>>>> >>> really serve a > >>>>>>> >>> purpose. > >>>>>>> >>> > >>>>>>> >>> Thomas > >>>>>>> >>> > >>>>>>> >>> On Sun, Aug 16, > >>>>>>> >>> 2020 at 1:16 AM > >>>>>>> >>> Jan Lukavský > >>>>>>> >>> <je...@seznam.cz <mailto:je...@seznam.cz> > >>>>>>> >>> <mailto:je...@seznam.cz <mailto:je...@seznam.cz>>> > >>>>>>> >>> wrote: > >>>>>>> >>> > >>>>>>> >>> Hi Catlyn, > >>>>>>> >>> > >>>>>>> >>> if you use > >>>>>>> >>> global window > >>>>>>> >>> to perform > >>>>>>> >>> the > >>>>>>> >>> deduplication, then > >>>>>>> >>> it should be > >>>>>>> >>> expected to > >>>>>>> >>> have as many > >>>>>>> >>> timers as > >>>>>>> >>> there are > >>>>>>> >>> unique keys + > >>>>>>> >>> one timer for > >>>>>>> >>> each key that > >>>>>>> >>> arrived > >>>>>>> >>> during the > >>>>>>> >>> last 30 > >>>>>>> >>> minutes > >>>>>>> >>> (because > >>>>>>> >>> there is > >>>>>>> >>> timer set to > >>>>>>> >>> clear the > >>>>>>> >>> state in the > >>>>>>> >>> deduplication > >>>>>>> >>> function). > >>>>>>> >>> The reason > >>>>>>> >>> for that is > >>>>>>> >>> that Beam > >>>>>>> >>> creates timer > >>>>>>> >>> for window > >>>>>>> >>> garbage > >>>>>>> >>> collection > >>>>>>> >>> time to clear > >>>>>>> >>> state (see > >>>>>>> >>> [1]). If it > >>>>>>> >>> is global > >>>>>>> >>> window, then > >>>>>>> >>> each key will > >>>>>>> >>> have > >>>>>>> >>> associated > >>>>>>> >>> timer forever > >>>>>>> >>> (it might > >>>>>>> >>> open question > >>>>>>> >>> if it makes > >>>>>>> >>> sense in this > >>>>>>> >>> case, or if > >>>>>>> >>> Beam can do > >>>>>>> >>> any better). > >>>>>>> >>> > >>>>>>> >>> As I wrote > >>>>>>> >>> before, it > >>>>>>> >>> would > >>>>>>> >>> probably help > >>>>>>> >>> to use two > >>>>>>> >>> deduplications in > >>>>>>> >>> two > >>>>>>> >>> successive > >>>>>>> >>> fixed windows > >>>>>>> >>> of length 30 > >>>>>>> >>> minutes, > >>>>>>> >>> shifted by 15 > >>>>>>> >>> minutes > >>>>>>> >>> (FixedWindows.of(30 > >>>>>>> >>> minutes).withOffset(15 > >>>>>>> >>> minutes)), so > >>>>>>> >>> that the two > >>>>>>> >>> windows > >>>>>>> >>> overlap and > >>>>>>> >>> catch > >>>>>>> >>> duplicates > >>>>>>> >>> that would > >>>>>>> >>> appear near > >>>>>>> >>> boundary of > >>>>>>> >>> the first window. > >>>>>>> >>> > >>>>>>> >>> @Max, do you > >>>>>>> >>> think it > >>>>>>> >>> would be > >>>>>>> >>> possible to > >>>>>>> >>> schedule the > >>>>>>> >>> cleanup timer > >>>>>>> >>> only when > >>>>>>> >>> there is > >>>>>>> >>> actually data > >>>>>>> >>> in state for > >>>>>>> >>> given key? > >>>>>>> >>> The timer > >>>>>>> >>> would be > >>>>>>> >>> cleared on > >>>>>>> >>> call to > >>>>>>> >>> `clear()`, > >>>>>>> >>> but would > >>>>>>> >>> have to be > >>>>>>> >>> set on every > >>>>>>> >>> write. Or > >>>>>>> >>> would it make > >>>>>>> >>> sense not to > >>>>>>> >>> schedule the > >>>>>>> >>> cleanup timer > >>>>>>> >>> for global > >>>>>>> >>> window at all? > >>>>>>> >>> > >>>>>>> >>> Jan > >>>>>>> >>> > >>>>>>> >>> [1] > >>>>>>> >>> > >>>>>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L334 > >>>>>>> > >>>>>>> > >>>>>>> >>> > >>>>>>> >>> On 8/15/20 > >>>>>>> >>> 5:47 PM, > >>>>>>> >>> Catlyn Kong > >>>>>>> >>> wrote: > >>>>>>> >>>> Hi! > >>>>>>> >>>> > >>>>>>> >>>> Thanks for > >>>>>>> >>>> the > >>>>>>> >>>> explanation! > >>>>>>> >>>> The > >>>>>>> >>>> screenshot > >>>>>>> >>>> actually > >>>>>>> >>>> �� shows all > >>>>>>> >>>> the new > >>>>>>> >>>> instances > >>>>>>> >>>> between > >>>>>>> >>>> marking the > >>>>>>> >>>> heap and > >>>>>>> >>>> taking a > >>>>>>> >>>> heap dump, > >>>>>>> >>>> so sorry if > >>>>>>> >>>> that's a > >>>>>>> >>>> little > >>>>>>> >>>> confusing. > >>>>>>> >>>> Here's what > >>>>>>> >>>> the full > >>>>>>> >>>> heap looks like: > >>>>>>> >>>> Screen Shot > >>>>>>> >>>> 2020-08-15 > >>>>>>> >>>> at 8.31.42 > >>>>>>> >>>> AM.png > >>>>>>> >>>> Our input > >>>>>>> >>>> stream has > >>>>>>> >>>> roughly 50 > >>>>>>> >>>> messages per > >>>>>>> >>>> second and > >>>>>>> >>>> the pipeline > >>>>>>> >>>> has been > >>>>>>> >>>> running for > >>>>>>> >>>> about 24 > >>>>>>> >>>> hours. Even > >>>>>>> >>>> assuming all > >>>>>>> >>>> the messages > >>>>>>> >>>> are unique, > >>>>>>> >>>> 5.5 million > >>>>>>> >>>> timers is > >>>>>>> >>>> still very > >>>>>>> >>>> surprising. > >>>>>>> >>>> > >>>>>>> >>>> We're > >>>>>>> >>>> allocating > >>>>>>> >>>> 11G for > >>>>>>> >>>> taskmanager JVM > >>>>>>> >>>> heap, but it > >>>>>>> >>>> eventually > >>>>>>> >>>> gets filled > >>>>>>> >>>> up (after > >>>>>>> >>>> couple days) > >>>>>>> >>>> and the > >>>>>>> >>>> cluster ends > >>>>>>> >>>> up in a bad > >>>>>>> >>>> state. > >>>>>>> >>>> Here's a > >>>>>>> >>>> screenshot > >>>>>>> >>>> of the heap > >>>>>>> >>>> size over > >>>>>>> >>>> the past 24h: > >>>>>>> >>>> Screen Shot > >>>>>>> >>>> 2020-08-15 > >>>>>>> >>>> at 8.41.48 > >>>>>>> >>>> AM.png > >>>>>>> >>>> > >>>>>>> >>>> Could it be > >>>>>>> >>>> that the > >>>>>>> >>>> timers never > >>>>>>> >>>> got clear > >>>>>>> >>>> out or maybe > >>>>>>> >>>> the pipeline > >>>>>>> >>>> is creating > >>>>>>> >>>> more > >>>>>>> >>>> timer instances > >>>>>>> >>>> than expected? > >>>>>>> >>>> > >>>>>>> >>>> On Sat, Aug > >>>>>>> >>>> 15, 2020 at > >>>>>>> >>>> 4:07 AM > >>>>>>> >>>> Maximilian > >>>>>>> >>>> Michels > >>>>>>> >>>> <m...@apache.org <mailto:m...@apache.org> > >>>>>>> >>>> <mailto:m...@apache.org <mailto:m...@apache.org>>> > >>>>>>> >>>> wrote: > >>>>>>> >>>> > >>>>>>> >>>> Awesome! > >>>>>>> >>>> Thanks a > >>>>>>> >>>> lot for > >>>>>>> >>>> the > >>>>>>> >>>> memory > >>>>>>> >>>> profile. > >>>>>>> >>>> Couple > >>>>>>> >>>> remarks: > >>>>>>> >>>> > >>>>>>> >>>> a) I can > >>>>>>> >>>> see that > >>>>>>> >>>> there > >>>>>>> >>>> are > >>>>>>> >>>> about > >>>>>>> >>>> 378k > >>>>>>> >>>> keys and > >>>>>>> >>>> each of > >>>>>>> >>>> them > >>>>>>> >>>> sets a > >>>>>>> >>>> timer. > >>>>>>> >>>> �� b) Based > >>>>>>> >>>> on the > >>>>>>> >>>> settings > >>>>>>> >>>> for > >>>>>>> >>>> DeduplicatePerKey > >>>>>>> >>>> you > >>>>>>> >>>> posted, > >>>>>>> >>>> you will > >>>>>>> >>>> keep > >>>>>>> >>>> track of > >>>>>>> >>>> all keys > >>>>>>> >>>> of the > >>>>>>> >>>> last 30 > >>>>>>> >>>> minutes. > >>>>>>> >>>> > >>>>>>> >>>> Unless > >>>>>>> >>>> you have > >>>>>>> >>>> much > >>>>>>> >>>> fewer > >>>>>>> >>>> keys, > >>>>>>> >>>> the > >>>>>>> >>>> behavior > >>>>>>> >>>> is to be > >>>>>>> >>>> expected. The > >>>>>>> >>>> > >>>>>>> >>>> memory > >>>>>>> >>>> sizes > >>>>>>> >>>> for the > >>>>>>> >>>> timer > >>>>>>> >>>> maps do > >>>>>>> >>>> not look > >>>>>>> >>>> particularly > >>>>>>> >>>> high > >>>>>>> >>>> (~12Mb). > >>>>>>> >>>> > >>>>>>> >>>> How much > >>>>>>> >>>> memory > >>>>>>> >>>> did you > >>>>>>> >>>> reserve > >>>>>>> >>>> for the > >>>>>>> >>>> task > >>>>>>> >>>> managers?* > >>>>>>> >>>> > >>>>>>> >>>> -Max > >>>>>>> >>>> > >>>>>>> >>>> *The > >>>>>>> >>>> image > >>>>>>> >>>> links > >>>>>>> >>>> give me > >>>>>>> >>>> a "504 > >>>>>>> >>>> error". > >>>>>>> >>>> > >>>>>>> >>>> On > >>>>>>> >>>> 14.08.20 > >>>>>>> >>>> 23:29, > >>>>>>> >>>> Catlyn > >>>>>>> >>>> Kong wrote: > >>>>>>> >>>> > Hi! > >>>>>>> >>>> > > >>>>>>> >>>> > We're > >>>>>>> >>>> indeed > >>>>>>> >>>> using > >>>>>>> >>>> the > >>>>>>> >>>> rocksdb > >>>>>>> >>>> state > >>>>>>> >>>> backend, > >>>>>>> >>>> so that > >>>>>>> >>>> might be > >>>>>>> >>>> part of > >>>>>>> >>>> > the > >>>>>>> >>>> reason. > >>>>>>> >>>> Due to > >>>>>>> >>>> some > >>>>>>> >>>> security > >>>>>>> >>>> concerns, we > >>>>>>> >>>> might > >>>>>>> >>>> not be > >>>>>>> >>>> able to > >>>>>>> >>>> > > >>>>>>> >>>> provide > >>>>>>> >>>> the full > >>>>>>> >>>> heap > >>>>>>> >>>> dump > >>>>>>> >>>> since we > >>>>>>> >>>> have > >>>>>>> >>>> some > >>>>>>> >>>> custom > >>>>>>> >>>> code > >>>>>>> >>>> path. But > >>>>>>> >>>> > here's > >>>>>>> >>>> a > >>>>>>> >>>> screenshot > >>>>>>> >>>> from > >>>>>>> >>>> JProfiler: > >>>>>>> >>>> > Screen > >>>>>>> >>>> Shot > >>>>>>> >>>> 2020-08-14 > >>>>>>> >>>> at > >>>>>>> >>>> 9.10.07 > >>>>>>> >>>> AM.png > >>>>>>> >>>> > Looks > >>>>>>> >>>> like > >>>>>>> >>>> TimerHeapInternalTimer > >>>>>>> >>>> (initiated > >>>>>>> >>>> in > >>>>>>> >>>> InternalTimerServiceImpl > >>>>>>> >>>> > >>>>>>> >>>> > > >>>>>>> >>>> > >>>>>>> > >>>>>>> <https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L46>) > >>>>>>> > >>>>>>> > >>>>>>> >>>> > >>>>>>> >>>> > isn't > >>>>>>> >>>> getting > >>>>>>> >>>> garbage > >>>>>>> >>>> collected? > >>>>>>> >>>> As David > >>>>>>> >>>> has > >>>>>>> >>>> mentioned the > >>>>>>> >>>> pipeline > >>>>>>> >>>> > uses > >>>>>>> >>>> DeduplicatePerKey > >>>>>>> >>>> > >>>>>>> >>>> > > >>>>>>> >>>> > >>>>>>> > >>>>>>> <https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey> > >>>>>>> in > >>>>>>> > >>>>>>> >>>> > >>>>>>> >>>> > Beam > >>>>>>> >>>> 2.22, > >>>>>>> >>>> ProcessConnectionEventFn > >>>>>>> >>>> is a > >>>>>>> >>>> simple stateless > >>>>>>> >>>> DoFn > >>>>>>> >>>> that just > >>>>>>> >>>> > does > >>>>>>> >>>> some > >>>>>>> >>>> logging > >>>>>>> >>>> and > >>>>>>> >>>> emits > >>>>>>> >>>> the > >>>>>>> >>>> events. > >>>>>>> >>>> Is there > >>>>>>> >>>> any > >>>>>>> >>>> possibility > >>>>>>> >>>> that > >>>>>>> >>>> > the > >>>>>>> >>>> timer > >>>>>>> >>>> logic or > >>>>>>> >>>> the way > >>>>>>> >>>> it's > >>>>>>> >>>> used in > >>>>>>> >>>> the > >>>>>>> >>>> dedupe > >>>>>>> >>>> Pardo > >>>>>>> >>>> can > >>>>>>> >>>> cause this > >>>>>>> >>>> > leak? > >>>>>>> >>>> > > >>>>>>> >>>> > Thanks, > >>>>>>> >>>> > Catlyn > >>>>>>> >>>> > > >>>>>>> >>>> > On > >>>>>>> >>>> Tue, Aug > >>>>>>> >>>> 11, 2020 > >>>>>>> >>>> at 7:58 > >>>>>>> >>>> AM > >>>>>>> >>>> Maximilian > >>>>>>> >>>> Michels > >>>>>>> >>>> <m...@apache.org <mailto:m...@apache.org> > >>>>>>> >>>> <mailto:m...@apache.org <mailto:m...@apache.org>> > >>>>>>> >>>> > >>>>>>> >>>> > > >>>>>>> >>>> <mailto:m...@apache.org <mailto:m...@apache.org> > >>>>>>> >>>> <mailto:m...@apache.org > >>>>>>> <mailto:m...@apache.org>>>> > >>>>>>> >>>> wrote: > >>>>>>> >>>> > > >>>>>>> >>>> > Hi! > >>>>>>> >>>> > > >>>>>>> >>>> > > >>>>>>> >>>> Looks > >>>>>>> >>>> like a > >>>>>>> >>>> potential leak, > >>>>>>> >>>> caused > >>>>>>> >>>> by your > >>>>>>> >>>> code or > >>>>>>> >>>> by Beam > >>>>>>> >>>> itself. > >>>>>>> >>>> > > >>>>>>> >>>> Would > >>>>>>> >>>> you be > >>>>>>> >>>> able to > >>>>>>> >>>> supply a > >>>>>>> >>>> heap > >>>>>>> >>>> dump > >>>>>>> >>>> from one > >>>>>>> >>>> of the > >>>>>>> >>>> task > >>>>>>> >>>> managers? > >>>>>>> >>>> > > >>>>>>> >>>> That > >>>>>>> >>>> would > >>>>>>> >>>> greatly > >>>>>>> >>>> help > >>>>>>> >>>> debugging this > >>>>>>> >>>> issue. > >>>>>>> >>>> > > >>>>>>> >>>> > -Max > >>>>>>> >>>> > > >>>>>>> >>>> > On > >>>>>>> >>>> 07.08.20 > >>>>>>> >>>> 00:19, > >>>>>>> >>>> David > >>>>>>> >>>> Gogokhiya wrote: > >>>>>>> >>>> > > Hi, > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> We > >>>>>>> >>>> recently > >>>>>>> >>>> started > >>>>>>> >>>> using > >>>>>>> >>>> Apache > >>>>>>> >>>> Beam > >>>>>>> >>>> version > >>>>>>> >>>> 2.20.0 > >>>>>>> >>>> running on > >>>>>>> >>>> > Flink > >>>>>>> >>>> > > > >>>>>>> >>>> version > >>>>>>> >>>> 1.9 > >>>>>>> >>>> deployed > >>>>>>> >>>> on > >>>>>>> >>>> kubernetes > >>>>>>> >>>> to > >>>>>>> >>>> process > >>>>>>> >>>> unbounded streams > >>>>>>> >>>> > of > >>>>>>> >>>> data. > >>>>>>> >>>> > > > >>>>>>> >>>> However, > >>>>>>> >>>> we > >>>>>>> >>>> noticed > >>>>>>> >>>> that the > >>>>>>> >>>> memory > >>>>>>> >>>> consumed > >>>>>>> >>>> by > >>>>>>> >>>> stateful > >>>>>>> >>>> Beam is > >>>>>>> >>>> > > > >>>>>>> >>>> steadily > >>>>>>> >>>> increasing > >>>>>>> >>>> over > >>>>>>> >>>> time > >>>>>>> >>>> with no > >>>>>>> >>>> drops no > >>>>>>> >>>> matter > >>>>>>> >>>> what the > >>>>>>> >>>> > current > >>>>>>> >>>> > > > >>>>>>> >>>> bandwidth is. > >>>>>>> >>>> We were > >>>>>>> >>>> wondering if > >>>>>>> >>>> this is > >>>>>>> >>>> expected > >>>>>>> >>>> and if > >>>>>>> >>>> not what > >>>>>>> >>>> > > > >>>>>>> >>>> would be > >>>>>>> >>>> the best > >>>>>>> >>>> way to > >>>>>>> >>>> resolve it. > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> More > >>>>>>> >>>> Context > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> We have > >>>>>>> >>>> the > >>>>>>> >>>> following pipeline > >>>>>>> >>>> that > >>>>>>> >>>> consumes > >>>>>>> >>>> messages > >>>>>>> >>>> from the > >>>>>>> >>>> > unbounded > >>>>>>> >>>> > > > >>>>>>> >>>> stream > >>>>>>> >>>> of data. > >>>>>>> >>>> Later we > >>>>>>> >>>> deduplicate > >>>>>>> >>>> the > >>>>>>> >>>> messages > >>>>>>> >>>> based on > >>>>>>> >>>> unique > >>>>>>> >>>> > > > >>>>>>> >>>> message > >>>>>>> >>>> id using > >>>>>>> >>>> the > >>>>>>> >>>> deduplicate > >>>>>>> >>>> function > >>>>>>> >>>> > > > >>>>>>> >>>> > > >>>>>>> >>>> > >>>>>>> > >>>>>>> <https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>. > >>>>>>> > >>>>>>> > >>>>>>> >>>> > > >>>>>>> >>>> > > > >>>>>>> >>>> Since we > >>>>>>> >>>> are > >>>>>>> >>>> using > >>>>>>> >>>> Beam > >>>>>>> >>>> version > >>>>>>> >>>> 2.20.0, > >>>>>>> >>>> we > >>>>>>> >>>> copied > >>>>>>> >>>> the > >>>>>>> >>>> source code > >>>>>>> >>>> > of the > >>>>>>> >>>> > > > >>>>>>> >>>> deduplicate > >>>>>>> >>>> function > >>>>>>> >>>> > > > >>>>>>> >>>> > > >>>>>>> >>>> > >>>>>>> > >>>>>>> <https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>from > >>>>>>> > >>>>>>> > >>>>>>> >>>> > > >>>>>>> >>>> > > > >>>>>>> >>>> version > >>>>>>> >>>> 2.22.0. > >>>>>>> >>>> After > >>>>>>> >>>> that we > >>>>>>> >>>> unmap > >>>>>>> >>>> the > >>>>>>> >>>> tuple, > >>>>>>> >>>> retrieve the > >>>>>>> >>>> > necessary > >>>>>>> >>>> > > > >>>>>>> >>>> data > >>>>>>> >>>> from > >>>>>>> >>>> message > >>>>>>> >>>> payload > >>>>>>> >>>> and dump > >>>>>>> >>>> the > >>>>>>> >>>> corresponding > >>>>>>> >>>> data into > >>>>>>> >>>> > > >>>>>>> >>>> the log. > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> Pipeline: > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> Flink > >>>>>>> >>>> configuration: > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> As we > >>>>>>> >>>> mentioned before, > >>>>>>> >>>> we > >>>>>>> >>>> noticed > >>>>>>> >>>> that the > >>>>>>> >>>> memory > >>>>>>> >>>> usage of the > >>>>>>> >>>> > > > >>>>>>> >>>> jobmanager > >>>>>>> >>>> and > >>>>>>> >>>> taskmanager > >>>>>>> >>>> pod are > >>>>>>> >>>> steadily > >>>>>>> >>>> increasing > >>>>>>> >>>> with no > >>>>>>> >>>> > > >>>>>>> >>>> drops no > >>>>>>> >>>> > > > >>>>>>> >>>> matter > >>>>>>> >>>> what the > >>>>>>> >>>> current > >>>>>>> >>>> bandwidth is. > >>>>>>> >>>> We tried > >>>>>>> >>>> allocating > >>>>>>> >>>> more > >>>>>>> >>>> > memory > >>>>>>> >>>> > > > >>>>>>> >>>> but it > >>>>>>> >>>> seems > >>>>>>> >>>> like no > >>>>>>> >>>> matter > >>>>>>> >>>> how much > >>>>>>> >>>> memory > >>>>>>> >>>> we > >>>>>>> >>>> allocate it > >>>>>>> >>>> > > >>>>>>> >>>> eventually > >>>>>>> >>>> > > > >>>>>>> >>>> reaches > >>>>>>> >>>> its > >>>>>>> >>>> limit > >>>>>>> >>>> and then > >>>>>>> >>>> it tries > >>>>>>> >>>> to > >>>>>>> >>>> restart > >>>>>>> >>>> itself. > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> Sincerely, > >>>>>>> >>>> David > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> > > >>>>>>> >>>> > >>>>>>> >