Yes, the problem will arise probably mostly when you have not well distributed keys (or too few keys). I'm really not sure if a pure GBK with a trigger can solve this - it might help to have data driven trigger. There would still be some doubts, though. The main question is still here - people say, that sorting by timestamp before stateful ParDo would be prohibitively slow, but I don't really see why - the sorting is very probably already there. And if not (hash grouping instead of sorted grouping), then the sorting would affect only user defined StatefulParDos.

This would suggest that the best way out of this would be really to add annotation, so that the author of the pipeline can decide.

If that would be acceptable I think I can try to prepare some basic functionality, but I'm not sure, if I would be able to cover all runners / sdks.

On 5/20/19 11:36 PM, Lukasz Cwik wrote:
It is read all per key and window and not just read all (this still won't scale with hot keys in the global window). The GBK preceding the StatefulParDo will guarantee that you are processing all the values for a specific key and window at any given time. Is there a specific window/trigger that is missing that you feel would remove the need for you to use StatefulParDo?

On Mon, May 20, 2019 at 12:54 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    Hi Lukasz,

    > Today, if you must have a strict order, you must guarantee that
    your StatefulParDo implements the necessary "buffering & sorting"
    into state.

    Yes, no problem with that. But this whole discussion started,
    because *this doesn't work on batch*. You simply cannot first read
    everything from distributed storage and then buffer it all into
    memory, just to read it again, but sorted. That will not work. And
    even if it would, it would be a terrible waste of resources.

    Jan

    On 5/20/19 8:39 PM, Lukasz Cwik wrote:


    On Mon, May 20, 2019 at 8:24 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        This discussion brings many really interesting questions for
        me. :-)

         > I don't see batch vs. streaming as part of the model. One
        can have
        microbatch, or even a runner that alternates between
        different modes.

        Although I understand motivation of this statement, this
        project name is
        "Apache Beam: An advanced unified programming model". What
        does the
        model unify, if "streaming vs. batch" is not part of the model?

        Using microbatching, chaining of batch jobs, or pure
        streaming are
        exactly the "runtime conditions/characteristics" I refer to.
        All these
        define several runtime parameters, which in turn define how
        well/badly
        will the pipeline perform and how many resources might be
        needed. From
        my point of view, pure streaming should be the most resource
        demanding
        (if not, why bother with batch? why not run everything in
        streaming
        only? what will there remain to "unify"?).

         > Fortunately, for batch, only the state for a single key
        needs to be
        preserved at a time, rather than the state for all keys
        across the range
        of skew. Of course if you have few or hot keys, one can still
        have
        issues (and this is not specific to StatefulDoFns).

        Yes, but here is still the presumption that my stateful DoFn can
        tolerate arbitrary shuffling of inputs. Let me explain the
        use case in
        more detail.

        Suppose you have input stream consisting of 1s and 0s (and
        some key for
        each element, which is irrelevant for the demonstration).
        Your task is
        to calculate in running global window the actual number of
        changes
        between state 0 and state 1 and vice versa. When the state
        doesn't
        change, you don't calculate anything. If input (for given
        key) would be
        (tN denotes timestamp N):

          t1: 1

          t2: 0

          t3: 0

          t4: 1

          t5: 1

          t6: 0

        then the output should yield (supposing that default state is
        zero):

          t1: (one: 1, zero: 0)

          t2: (one: 1, zero: 1)

          t3: (one: 1, zero: 1)

          t4: (one: 2, zero: 1)

          t5: (one: 2, zero: 1)

          t6: (one: 2, zero: 2)

        How would you implement this in current Beam semantics?

    I think your saying here that I know that my input is ordered in
    a specific way and since I assume the order when writing my
    pipeline I can perform this optimization. But there is nothing
    preventing a runner from noticing that your processing in the
    global window with a specific type of trigger and re-ordering
    your inputs/processing to get better performance (since you can't
    use an AfterWatermark trigger for your pipeline in streaming for
    the GlobalWindow).

    Today, if you must have a strict order, you must guarantee that
    your StatefulParDo implements the necessary "buffering & sorting"
    into state. I can see why you would want an annotation that says
    I must have timestamp ordered elements, since it makes writing
    certain StatefulParDos much easier. StatefulParDo is a low-level
    function, it really is the "here you go and do whatever you need
    to but here be dragons" function while windowing and triggering
    is meant to keep many people from writing StatefulParDo in the
    first place.

         > Pipelines that fail in the "worst case" batch scenario are
        likely to
        degrade poorly (possibly catastrophically) when the watermark
        falls
        behind in streaming mode as well.

        But the worst case is defined by input of size (available
        resources +
        single byte) -> pipeline fail. Although it could have
        finished, given
        the right conditions.

         > This might be reasonable, implemented by default by buffering
        everything and releasing elements as the watermark
        (+lateness) advances,
        but would likely lead to inefficient (though *maybe* easier
        to reason
        about) code.

        Sure, the pipeline will be less efficient, because it would
        have to
        buffer and sort the inputs. But at least it will produce
        correct results
        in cases where updates to state are order-sensitive.

         > Would it be roughly equivalent to GBK + FlatMap(lambda
        (key, values):
        [(key, value) for value in values])?

        I'd say roughly yes, but difference would be in the trigger.
        The trigger
        should ideally fire as soon as watermark (+lateness) crosses
        element
        with lowest timestamp in the buffer. Although this could be
        somehow
        emulated by fixed trigger each X millis.

         > Or is the underlying desire just to be able to hint to the
        runner
        that the code may perform better (e.g. require less
        resources) as skew
        is reduced (and hence to order by timestamp iff it's cheap)?

        No, the sorting would have to be done in streaming case as
        well. That is
        an imperative of the unified model. I think it is possible to
        sort by
        timestamp only in batch case (and do it for *all* batch
        stateful pardos
        without annotation), or introduce annotation, but then make
        the same
        guarantees for streaming case as well.

        Jan

        On 5/20/19 4:41 PM, Robert Bradshaw wrote:
        > On Mon, May 20, 2019 at 1:19 PM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:
        >> Hi Robert,
        >>
        >> yes, I think you rephrased my point - although no
        *explicit* guarantees
        >> of ordering are given in either mode, there is *implicit*
        ordering in
        >> streaming case that is due to nature of the processing -
        the difference
        >> between watermark and timestamp of elements flowing
        through the pipeline
        >> are generally low (too high difference leads to the
        overbuffering
        >> problem), but there is no such bound on batch.
        > Fortunately, for batch, only the state for a single key
        needs to be
        > preserved at a time, rather than the state for all keys
        across the
        > range of skew. Of course if you have few or hot keys, one
        can still
        > have issues (and this is not specific to StatefulDoFns).
        >
        >> As a result, I see a few possible solutions:
        >>
        >>    - the best and most natural seems to be extension of
        the model, so
        >> that it defines batch as not only "streaming pipeline
        executed in batch
        >> fashion", but "pipeline with at least as good runtime
        characteristics as
        >> in streaming case, executed in batch fashion", I really
        don't think that
        >> there are any conflicts with the current model, or that
        this could
        >> affect performance, because the required sorting (as
        pointed by
        >> Aljoscha) is very probably already done during translation
        of stateful
        >> pardos. Also note that this definition only affects user
        defined
        >> stateful pardos
        > I don't see batch vs. streaming as part of the model. One
        can have
        > microbatch, or even a runner that alternates between
        different modes.
        > The model describes what the valid outputs are given a
        (sometimes
        > partial) set of inputs. It becomes really hard to define
        things like
        > "as good runtime characteristics." Once you allow any
        > out-of-orderedness, it is not very feasible to try and
        define (and
        > more cheaply implement) a "upper bound" of acceptable
        > out-of-orderedness.
        >
        > Pipelines that fail in the "worst case" batch scenario are
        likely to
        > degrade poorly (possibly catastrophically) when the
        watermark falls
        > behind in streaming mode as well.
        >
        >>    - another option would be to introduce annotation for
        DoFns (e.g.
        >> @RequiresStableTimeCharacteristics), which would result in
        the sorting
        >> in batch case - but - this extension would have to ensure
        the sorting in
        >> streaming mode also - it would require definition of
        allowed lateness,
        >> and triggger (essentially similar to window)
        > This might be reasonable, implemented by default by buffering
        > everything and releasing elements as the watermark (+lateness)
        > advances, but would likely lead to inefficient (though
        *maybe* easier
        > to reason about) code. Not sure about the semantics of
        triggering
        > here, especially data-driven triggers. Would it be roughly
        equivalent
        > to GBK + FlatMap(lambda (key, values): [(key, value) for
        value in
        > values])?
        >
        > Or is the underlying desire just to be able to hint to the
        runner that
        > the code may perform better (e.g. require less resources)
        as skew is
        > reduced (and hence to order by timestamp iff it's cheap)?
        >
        >>    - last option would be to introduce these "higher order
        guarantees" in
        >> some extension DSL (e.g. Euphoria), but that seems to be
        the worst
        >> option to me
        >>
        >> I see the first two options quite equally good, although
        the letter one
        >> is probably more time consuming to implement. But it would
        bring
        >> additional feature to streaming case as well.
        >>
        >> Thanks for any thoughts.
        >>
        >>    Jan
        >>
        >> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
        >>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:
        >>>> Hi Reuven,
        >>>>
        >>>>> How so? AFAIK stateful DoFns work just fine in batch
        runners.
        >>>> Stateful ParDo works in batch as far, as the logic
        inside the state works for absolutely unbounded
        out-of-orderness of elements. That basically (practically)
        can work only for cases, where the order of input elements
        doesn't matter. But, "state" can refer to "state machine",
        and any time you have a state machine involved, then the
        ordering of elements would matter.
        >>> No guarantees on order are provided in *either* streaming
        or batch
        >>> mode by the model. However, it is the case that in order
        to make
        >>> forward progress most streaming runners attempt to limit
        the amount of
        >>> out-of-orderedness of elements (in terms of event time
        vs. processing
        >>> time) to make forward progress, which in turn could help
        cap the
        >>> amount of state that must be held concurrently, whereas a
        batch runner
        >>> may not allow any state to be safely discarded until the
        whole
        >>> timeline from infinite past to infinite future has been
        observed.
        >>>
        >>> Also, as pointed out, state is not preserved "batch to
        batch" in batch mode.
        >>>
        >>>
        >>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels
        <[email protected] <mailto:[email protected]>> wrote:
        >>>
        >>>>>    batch semantics and streaming semantics differs only
        in that I can have GlobalWindow with default trigger on batch
        and cannot on stream
        >>>> You can have a GlobalWindow in streaming with a default
        trigger. You
        >>>> could define additional triggers that do early firings.
        And you could
        >>>> even trigger the global window by advancing the
        watermark to +inf.
        >>> IIRC, as a pragmatic note, we prohibited global window
        with default
        >>> trigger on unbounded PCollections in the SDK because this
        is more
        >>> likely to be user error than an actual desire to have no
        output until
        >>> drain. But it's semantically valid in the model.

Reply via email to