I'd suggest a modified option (2) which does not use a timer to perform the cleanup (as mentioned, this will cause problems with migrating state).

Instead, whenever we receive a watermark which closes the global window, we enumerate all keys and cleanup the associated state.

This is the cleanest and simplest option.

-Max

On 24.08.20 20:47, Thomas Weise wrote:

On Mon, Aug 24, 2020 at 11:35 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

     > The most general solution would be 3), given it can be agnostic
    to window types and does not assume extra runner capabilities.

    Agree, 2) is optimization to that. It might be questionable if this
    is premature optimization, but generally querying multiple states
    for each clear opeartion to any state might be prohibitive, mostly
    when the state would be stored in external database (in case of
    Flink that would be RocksDB).

For the use case I'm looking at, we are using the heap state backend. I have not checked the RocksDB, but would assume that incremental cost of isEmpty() for other states under the same key is negligible?

     > 3) wouldn't require any state migration.

    Actually, it would, as we would (ideally) like to migrate users'
    pipelines that already contain timers for the end of global window,
    which might not expire ever.

Good catch. This could potentially be addressed by upgrading the timer in the per record path.

    On 8/24/20 7:44 PM, Thomas Weise wrote:

    On Fri, Aug 21, 2020 at 12:32 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        If there are runners, that are unable to efficiently enumerate
        keys in state, then there probably isn't a runner agnostic
        solution to this. If we focus on Flink, we can provide
        specific implementation of CleanupTimer, which might then do
        anything from the mentioned options. I'd be +1 for option 2)
        for key-aligned windows (all currently supported) and option
        3) for unaligned windows in the future.

    The most general solution would be 3), given it can be agnostic to
    window types and does not assume extra runner capabilities. It
    would require to introspect all user states for a given key on
    state.clear. That assumes as efficient implementation of
    isEmpty(). If all states are empty (have been cleared), then we
    can remove the cleanup timer. And add it back on state.add. I'm
    planning to give that a shot (for Flink/portable/streaming) to see
    how it performs.

        We should also consider how we migrate users from the current
        state to any future implementation. In case of option 2) it
        should be possible to do this when the state is loaded from
        savepoint, but I'm not 100% sure about that.

    3) wouldn't require any state migration.

        Jan

        On 8/21/20 6:25 AM, Thomas Weise wrote:
        Thanks for the clarification.

        Here are a few potential options to address the issue, based
        on the discussion so far:

        1) Optionally skip cleanup timer for global window
        (user-controlled via pipeline option)

        2) Instead of setting a cleanup timer for every key, handle
        all keys for a given window with a single timer. This would
        be runner specific and depend on if/how a given
        runner supports key enumeration. Flink's keyed state backend
        supports enumerating keys for a namespace (Beam window) and
        state tag. [1]

        3) Set the cleanup timer only when there is actually state
        associated with a key. This could be accomplished by
        intercepting append and clear in BagUserStateHandler [2] and
        adding/removing the timer appropriately.

        4) See if TTL support in the runner can is applicable, for
        Flink see [3]

        [1]
        
https://github.com/apache/flink/blob/release-1.10/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java#L76

        [2]
        
https://github.com/apache/beam/blob/release-2.23.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java#L315

        [3]
        
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl


        On Thu, Aug 20, 2020 at 8:08 AM Reuven Lax <re...@google.com
        <mailto:re...@google.com>> wrote:

            Also +1 to what Jan said. Streaming pipelines can process
            bounded PCollections on some paths, so the global window
            will terminate for those paths. This is also true for the
            direct runner  tetsts where PCollections pretend to be
            unbounded, but we then advance the watermark to +inf to
            terminate the pipeline.

            On Thu, Aug 20, 2020 at 8:06 AM Reuven Lax
            <re...@google.com <mailto:re...@google.com>> wrote:

                It is not Dataflow specific, but I think Dataflow is
                the only runner that currently implements
                
Drain:https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit


                When a pipeline is drained, all windows (including
                global windows) end, and the windows processed (i.e.
                as if they were fixed windows that terminated).
                Currently the easiest way to ensure that is to rely
                on the end-of-window timers for the global window
                (alternatives are possible, like issuing a full-state
                scan when a pipeline is drained, but that would be
                quite a bit more complicated). This is not
                specifically the GC timer, but rather the
                end-of-window timer that is needed.

                I believe that right now we don't have a way of
                deleting timers if there are no elements buffered for
                a key (e.g. a key that received a few elements that
                were processed in a trigger and then never received
                any more elements). This might be part of the problem
                - large numbers of empty keys with noop timers set.
                It would be nice if there were a way to detect this
                and at least remove the timers for those empty keys.

                Reuven

                On Wed, Aug 19, 2020 at 9:20 PM Thomas Weise
                <t...@apache.org <mailto:t...@apache.org>> wrote:



                    On Wed, Aug 19, 2020 at 9:49 AM Reuven Lax
                    <re...@google.com <mailto:re...@google.com>> wrote:

                        Skipping the cleanup timer for the global
                        window will break any sort of drain
                        functionality, which relies on having those
                        timers there. It's also necessary for bounded
                        inputs, for the same reason.


                    Can you say a bit more about why this will break
                    drain functionality and bounded inputs? Is this
                    Dataflow specific? Is it because the state would
                    be reused by a subsequent instance of the pipeline?

                    For Flink, the GC timers would be triggered by
                    the final watermark and that will be the end of
                    the streaming job. Launching the same pipeline
                    again will either be a cold start with no
                    previous state or a start from savepoint/checkpoint.

                    It sounds like for Dataflow there may be a need
                    for the user to influence the behavior while for
                    Flink the GC timers in a global window are not
                    required.





                    On Wed, Aug 19, 2020 at 10:31 AM Reuven Lax
                    <re...@google.com <mailto:re...@google.com>> wrote:



                        On Wed, Aug 19, 2020 at 9:53 AM Steve Niemitz
                        <sniem...@apache.org
                        <mailto:sniem...@apache.org>> wrote:

                            for what it's worth, dataflow has the
                            same problem here as well.  We've also
                            worked around it by (optionally)
                            disabling the cleanup timer in global
                            windows.  But I agree, having drain then
                            be an unsafe operation is not great.


                        Dataflow does not require the timers to be in
                        memory though, so unless the numbers get very
                        large (to the point where you run out of disk
                        storage storing the timers), it will not
                        cause your pipelines to fail.


                            I think for batch it's less of an issue
                            since basically everything is in the
                            global window anyways, and batch
                            pipelines run for a fixed amount of time
                            on a fixed input source.  For streaming
                            pipelines, it's much easier to run into
                            this.


                            On Wed, Aug 19, 2020 at 12:50 PM Reuven
                            Lax <re...@google.com
                            <mailto:re...@google.com>> wrote:

                                @OnWindowExpiration is a per-key
                                callback.

                                On Wed, Aug 19, 2020 at 9:48 AM Luke
                                Cwik <lc...@google.com
                                <mailto:lc...@google.com>> wrote:

                                    With the addition
                                    of @OnWindowExpiration, a single
                                    timer across keys optimization
                                    would still make sense.

                                    On Wed, Aug 19, 2020 at 8:51 AM
                                    Thomas Weise <t...@apache.org
                                    <mailto:t...@apache.org>> wrote:

                                        
https://issues.apache.org/jira/browse/BEAM-10760

                                        I confirmed that skipping the
                                        cleanup timers resolves the
                                        state leak that we observe in
                                        the pipeline that uses a
                                        global window.

                                        @Luke the GC is key
                                        partitioned and relies on
                                        StateInternals. That makes it
                                        impractical to have a single
                                        timer that performs cleanup
                                        for multiple keys, at least
                                        in a runner agnostic way.

                                        I would like to take a look
                                        if there is a need to have
                                        the GC timer for a
                                        global window to start with.
                                        Since the pipeline
                                        terminates, the
                                        runner discards all state
                                        anyways - at least in the
                                        case of Flink.

                                        Thomas

                                        On Mon, Aug 17, 2020 at 9:46
                                        AM Luke Cwik
                                        <lc...@google.com
                                        <mailto:lc...@google.com>> wrote:

                                            For the cleanup timer.

                                            On Mon, Aug 17, 2020 at
                                            9:45 AM Luke Cwik
                                            <lc...@google.com
                                            <mailto:lc...@google.com>> wrote:

                                                Replacing a timer for
                                                each key with just
                                                one timer for all
                                                keys would make sense
                                                for the global window.

                                                On Sun, Aug 16, 2020
                                                at 5:54 PM Thomas
                                                Weise <t...@apache.org
                                                <mailto:t...@apache.org>>
                                                wrote:

                                                    Thanks Jan. We
                                                    observe a similar
                                                    issue with state
                                                    size growth in
                                                    global window
                                                    (with the
                                                    portable runner).
                                                    We don't see this
                                                    issue
                                                    with non-global
                                                    windows,
                                                    there does not
                                                    appear to be any
                                                    residual. I will
                                                    take a look at
                                                    skipping the
                                                    cleanup timers
                                                    for global
                                                    window and see if
                                                    that resolves the
                                                    issue. These
                                                    timers lead to
                                                    potentially
                                                    unbounded state
                                                    growth and don't
                                                    really serve a
                                                    purpose.

                                                    Thomas

                                                    On Sun, Aug 16,
                                                    2020 at 1:16 AM
                                                    Jan Lukavský
                                                    <je...@seznam.cz
                                                    <mailto:je...@seznam.cz>>
                                                    wrote:

                                                        Hi Catlyn,

                                                        if you use
                                                        global window
                                                        to perform
                                                        the
                                                        deduplication, then
                                                        it should be
                                                        expected to
                                                        have as many
                                                        timers as
                                                        there are
                                                        unique keys +
                                                        one timer for
                                                        each key that
                                                        arrived
                                                        during the
                                                        last 30
                                                        minutes
                                                        (because
                                                        there is
                                                        timer set to
                                                        clear the
                                                        state in the
                                                        deduplication
                                                        function).
                                                        The reason
                                                        for that is
                                                        that Beam
                                                        creates timer
                                                        for window
                                                        garbage
                                                        collection
                                                        time to clear
                                                        state (see
                                                        [1]). If it
                                                        is global
                                                        window, then
                                                        each key will
                                                        have
                                                        associated
                                                        timer forever
                                                        (it might
                                                        open question
                                                        if it makes
                                                        sense in this
                                                        case, or if
                                                        Beam can do
                                                        any better).

                                                        As I wrote
                                                        before, it
                                                        would
                                                        probably help
                                                        to use two
                                                        deduplications in
                                                        two
                                                        successive
                                                        fixed windows
                                                        of length 30
                                                        minutes,
                                                        shifted by 15
                                                        minutes
                                                        (FixedWindows.of(30
                                                        minutes).withOffset(15
                                                        minutes)), so
                                                        that the two
                                                        windows
                                                        overlap and
                                                        catch
                                                        duplicates
                                                        that would
                                                        appear near
                                                        boundary of
                                                        the first window.

                                                        @Max, do you
                                                        think it
                                                        would be
                                                        possible to
                                                        schedule the
                                                        cleanup timer
                                                        only when
                                                        there is
                                                        actually data
                                                        in state for
                                                        given key?
                                                        The timer
                                                        would be
                                                        cleared on
                                                        call to
                                                        `clear()`,
                                                        but would
                                                        have to be
                                                        set on every
                                                        write. Or
                                                        would it make
                                                        sense not to
                                                        schedule the
                                                        cleanup timer
                                                        for global
                                                        window at all?

                                                        Jan

                                                        [1]
                                                        
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L334

                                                        On 8/15/20
                                                        5:47 PM,
                                                        Catlyn Kong
                                                        wrote:
                                                        Hi!

                                                        Thanks for
                                                        the
                                                        explanation!
                                                        The
                                                        screenshot
                                                        actually
                                                        shows all
                                                        the new
                                                        instances
                                                        between
                                                        marking the
                                                        heap and
                                                        taking a
                                                        heap dump,
                                                        so sorry if
                                                        that's a
                                                        little
                                                        confusing.
                                                        Here's what
                                                        the full
                                                        heap looks like:
                                                        Screen Shot
                                                        2020-08-15
                                                        at 8.31.42
                                                        AM.png
                                                        Our input
                                                        stream has
                                                        roughly 50
                                                        messages per
                                                        second and
                                                        the pipeline
                                                        has been
                                                        running for
                                                        about 24
                                                        hours. Even
                                                        assuming all
                                                        the messages
                                                        are unique,
                                                        5.5 million
                                                        timers is
                                                        still very
                                                        surprising.

                                                        We're
                                                        allocating
                                                        11G for
                                                        taskmanager JVM
                                                        heap, but it
                                                        eventually
                                                        gets filled
                                                        up (after
                                                        couple days)
                                                        and the
                                                        cluster ends
                                                        up in a bad
                                                        state.
                                                        Here's a
                                                        screenshot
                                                        of the heap
                                                        size over
                                                        the past 24h:
                                                        Screen Shot
                                                        2020-08-15
                                                        at 8.41.48
                                                        AM.png

                                                        Could it be
                                                        that the
                                                        timers never
                                                        got clear
                                                        out or maybe
                                                        the pipeline
                                                        is creating
                                                        more
                                                        timer instances
                                                        than expected?

                                                        On Sat, Aug
                                                        15, 2020 at
                                                        4:07 AM
                                                        Maximilian
                                                        Michels
                                                        <m...@apache.org
                                                        
<mailto:m...@apache.org>>
                                                        wrote:

                                                            Awesome!
                                                            Thanks a
                                                            lot for
                                                            the
                                                            memory
                                                            profile.
                                                            Couple
                                                            remarks:

                                                            a) I can
                                                            see that
                                                            there
                                                            are
                                                            about
                                                            378k
                                                            keys and
                                                            each of
                                                            them
                                                            sets a
                                                            timer.
                                                            b) Based
                                                            on the
                                                            settings
                                                            for
                                                            DeduplicatePerKey
                                                            you
                                                            posted,
                                                            you will
                                                            keep
                                                            track of
                                                            all keys
                                                            of the
                                                            last 30
                                                            minutes.

                                                            Unless
                                                            you have
                                                            much
                                                            fewer
                                                            keys,
                                                            the
                                                            behavior
                                                            is to be
                                                            expected. The

                                                            memory
                                                            sizes
                                                            for the
                                                            timer
                                                            maps do
                                                            not look
                                                            particularly
                                                            high
                                                            (~12Mb).

                                                            How much
                                                            memory
                                                            did you
                                                            reserve
                                                            for the
                                                            task
                                                            managers?*

                                                            -Max

                                                            *The
                                                            image
                                                            links
                                                            give me
                                                            a "504
                                                            error".

                                                            On
                                                            14.08.20
                                                            23:29,
                                                            Catlyn
                                                            Kong wrote:
                                                            > Hi!
                                                            >
                                                            > We're
                                                            indeed
                                                            using
                                                            the
                                                            rocksdb
                                                            state
                                                            backend,
                                                            so that
                                                            might be
                                                            part of
                                                            > the
                                                            reason.
                                                            Due to
                                                            some
                                                            security
                                                            concerns, we
                                                            might
                                                            not be
                                                            able to
                                                            >
                                                            provide
                                                            the full
                                                            heap
                                                            dump
                                                            since we
                                                            have
                                                            some
                                                            custom
                                                            code
                                                            path. But
                                                            > here's
                                                            a
                                                            screenshot
                                                            from
                                                            JProfiler:
                                                            > Screen
                                                            Shot
                                                            2020-08-14
                                                            at
                                                            9.10.07
                                                            AM.png
                                                            > Looks
                                                            like
                                                            
TimerHeapInternalTimer
                                                            (initiated
                                                            in
                                                            
InternalTimerServiceImpl

                                                            >
                                                            
<https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L46>)

                                                            > isn't
                                                            getting
                                                            garbage
                                                            collected?
                                                            As David
                                                            has
                                                            mentioned the
                                                            pipeline
                                                            > uses
                                                            DeduplicatePerKey

                                                            >
                                                            
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>
 in

                                                            > Beam
                                                            2.22,
                                                            
ProcessConnectionEventFn
                                                            is a
                                                            simple stateless
                                                            DoFn
                                                            that just
                                                            > does
                                                            some
                                                            logging
                                                            and
                                                            emits
                                                            the
                                                            events.
                                                            Is there
                                                            any
                                                            possibility
                                                            that
                                                            > the
                                                            timer
                                                            logic or
                                                            the way
                                                            it's
                                                            used in
                                                            the
                                                            dedupe
                                                            Pardo
                                                            can
                                                            cause this
                                                            > leak?
                                                            >
                                                            > Thanks,
                                                            > Catlyn
                                                            >
                                                            > On
                                                            Tue, Aug
                                                            11, 2020
                                                            at 7:58
                                                            AM
                                                            Maximilian
                                                            Michels
                                                            <m...@apache.org
                                                            
<mailto:m...@apache.org>

                                                            >
                                                            
<mailto:m...@apache.org
                                                            
<mailto:m...@apache.org>>>
                                                            wrote:
                                                            >
                                                            >     Hi!
                                                            >
>  Looks
                                                            like a
                                                            potential leak,
                                                            caused
                                                            by your
                                                            code or
                                                            by Beam
                                                            itself.
>  Would
                                                            you be
                                                            able to
                                                            supply a
                                                            heap
                                                            dump
                                                            from one
                                                            of the
                                                            task
                                                            managers?
>  That
                                                            would
                                                            greatly
                                                            help
                                                            debugging this
                                                            issue.
                                                            >
                                                            >     -Max
                                                            >
                                                            >     On
                                                            07.08.20
                                                            00:19,
                                                            David
                                                            Gogokhiya wrote:
                                                            >      > Hi,
                                                            >      >
                                                            >      >
                                                            We
                                                            recently
                                                            started
                                                            using
                                                            Apache
                                                            Beam
                                                            version
                                                            2.20.0
                                                            running on
                                                            >     Flink
                                                            >      >
                                                            version
                                                            1.9
                                                            deployed
                                                            on
                                                            kubernetes
                                                            to
                                                            process
                                                            unbounded streams
                                                            >     of
                                                            data.
                                                            >      >
                                                            However,
                                                            we
                                                            noticed
                                                            that the
                                                            memory
                                                            consumed
                                                            by
                                                            stateful
                                                            Beam is
                                                            >      >
                                                            steadily
                                                            increasing
                                                            over
                                                            time
                                                            with no
                                                            drops no
                                                            matter
                                                            what the
                                                            >  current
                                                            >      >
                                                            bandwidth is.
                                                            We were
                                                            wondering if
                                                            this is
                                                            expected
                                                            and if
                                                            not what
                                                            >      >
                                                            would be
                                                            the best
                                                            way to
                                                            resolve it.
                                                            >      >
                                                            >      >
                                                            > >
                                                             More
                                                            Context
                                                            >      >
                                                            >      >
                                                            We have
                                                            the
                                                            following pipeline
                                                            that
                                                            consumes
                                                            messages
                                                            from the
                                                            >  unbounded
                                                            >      >
                                                            stream
                                                            of data.
                                                            Later we
                                                            deduplicate
                                                            the
                                                            messages
                                                            based on
                                                            unique
                                                            >      >
                                                            message
                                                            id using
                                                            the
                                                            deduplicate
                                                            function
                                                            >      >
>  <https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>.
                                                            >
                                                            >      >
                                                            Since we
                                                            are
                                                            using
                                                            Beam
                                                            version
                                                            2.20.0,
                                                            we
                                                            copied
                                                            the
                                                            source code
                                                            >     of the
                                                            >      >
                                                            deduplicate
                                                            function
                                                            >      >
>  <https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>from
                                                            >
                                                            >      >
                                                            version
                                                            2.22.0.
                                                            After
                                                            that we
                                                            unmap
                                                            the
                                                            tuple,
                                                            retrieve the
                                                            >  necessary
                                                            >      >
                                                            data
                                                            from
                                                            message
                                                            payload
                                                            and dump
                                                            the
                                                            corresponding
                                                            data into
>  the log.
                                                            >      >
                                                            >      >
                                                            >      >
                                                            Pipeline:
                                                            >      >
                                                            >      >
                                                            >      >
                                                            Flink
                                                            configuration:
                                                            >      >
                                                            >      >
                                                            >      >
                                                            As we
                                                            mentioned before,
                                                            we
                                                            noticed
                                                            that the
                                                            memory
                                                            usage of the
                                                            >      >
                                                            jobmanager
                                                            and
                                                            taskmanager
                                                            pod are
                                                            steadily
                                                            increasing
                                                            with no
>  drops no
                                                            >      >
                                                            matter
                                                            what the
                                                            current
                                                            bandwidth is.
                                                            We tried
                                                            allocating
                                                            more
                                                            >  memory
                                                            >      >
                                                            but it
                                                            seems
                                                            like no
                                                            matter
                                                            how much
                                                            memory
                                                            we
                                                            allocate it
                                                            >
                                                             eventually
                                                            >      >
                                                            reaches
                                                            its
                                                            limit
                                                            and then
                                                            it tries
                                                            to
                                                            restart
                                                            itself.
                                                            >      >
                                                            >      >
                                                            >      >
                                                            Sincerely,
                                                            David
                                                            >      >
                                                            >      >
                                                            >

Reply via email to