Hi Kenn,

OK, so if we introduce annotation, we can have stateful ParDo with sorting, that would perfectly resolve my issues. I still have some doubts, though. Let me explain. The current behavior of stateful ParDo has the following properties:

 a) might fail in batch, although runs fine in streaming (that is due to the buffering, and unbounded lateness in batch, which was discussed back and forth in this thread)

 b) might be non deterministic (this is because the elements arrive at somewhat random order, and even if you do the operation "assign unique ID to elements" this might produce different results when run multiple times)

What worries me most is the property b), because it seems to me to have serious consequences - not only that if you run twice batch pipeline you would get different results, but even on streaming, when pipeline fails and gets restarted from checkpoint, produced output might differ from the previous run and data from the first run might have already been persisted into sink. That would create somewhat messy outputs.

These two properties makes me think that the current implementation is more of a _special case_ than the general one. The general one would be that your state doesn't have the properties to be able to tolerate buffering problems and/or non-determinism. Which is the case where you need sorting in both streaming and batch to be part of the model.

Let me point out one more analogy - that is merging vs. non-merging windows. The general case (merging windows) implies sorting by timestamp in both batch case (explicit) and streaming (buffering). The special case (non-merging windows) doesn't rely on any timestamp ordering, so the sorting and buffering can be dropped. The underlying root cause of this is the same for both stateful ParDo and windowing (essentially, assigning window labels is a stateful operation when windowing function is merging).

The reason for the current behavior of stateful ParDo seems to be performance, but is it right to abandon correctness in favor of performance? Wouldn't it be more consistent to have the default behavior prefer correctness and when you have the specific conditions of state function having special properties, then you can annotate your DoFn (with something like @TimeOrderingAgnostic), which would yield a better performance in that case?

Jan

On 5/21/19 1:00 AM, Kenneth Knowles wrote:
Thanks for the nice small example of a calculation that depends on order. You are right that many state machines have this property. I agree w/ you and Luke that it is convenient for batch processing to sort by event timestamp before running a stateful ParDo. In streaming you could also implement "sort by event timestamp" by buffering until you know all earlier data will be dropped - a slack buffer up to allowed lateness.

I do not think that it is OK to sort in batch and not in streaming. Many state machines diverge very rapidly when things are out of order. So each runner if they see the "@OrderByTimestamp" annotation (or whatever) needs to deliver sorted data (by some mix of buffering and dropping), or to reject the pipeline as unsupported.

And also want to say that this is not the default case - many uses of state & timers in ParDo yield different results at the element level, but the results are equivalent at in the big picture. Such as the example of "assign a unique sequence number to each element" or "group into batches" it doesn't matter exactly what the result is, only that it meets the spec. And other cases like user funnels are monotonic enough that you also don't actually need sorting.

Kenn

On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    Yes, the problem will arise probably mostly when you have not well
    distributed keys (or too few keys). I'm really not sure if a pure
    GBK with a trigger can solve this - it might help to have data
    driven trigger. There would still be some doubts, though. The main
    question is still here - people say, that sorting by timestamp
    before stateful ParDo would be prohibitively slow, but I don't
    really see why - the sorting is very probably already there. And
    if not (hash grouping instead of sorted grouping), then the
    sorting would affect only user defined StatefulParDos.

    This would suggest that the best way out of this would be really
    to add annotation, so that the author of the pipeline can decide.

    If that would be acceptable I think I can try to prepare some
    basic functionality, but I'm not sure, if I would be able to cover
    all runners / sdks.

    On 5/20/19 11:36 PM, Lukasz Cwik wrote:
    It is read all per key and window and not just read all (this
    still won't scale with hot keys in the global window). The GBK
    preceding the StatefulParDo will guarantee that you are
    processing all the values for a specific key and window at any
    given time. Is there a specific window/trigger that is missing
    that you feel would remove the need for you to use StatefulParDo?

    On Mon, May 20, 2019 at 12:54 PM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        Hi Lukasz,

        > Today, if you must have a strict order, you must guarantee
        that your StatefulParDo implements the necessary "buffering &
        sorting" into state.

        Yes, no problem with that. But this whole discussion started,
        because *this doesn't work on batch*. You simply cannot first
        read everything from distributed storage and then buffer it
        all into memory, just to read it again, but sorted. That will
        not work. And even if it would, it would be a terrible waste
        of resources.

        Jan

        On 5/20/19 8:39 PM, Lukasz Cwik wrote:


        On Mon, May 20, 2019 at 8:24 AM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:

            This discussion brings many really interesting questions
            for me. :-)

             > I don't see batch vs. streaming as part of the model.
            One can have
            microbatch, or even a runner that alternates between
            different modes.

            Although I understand motivation of this statement, this
            project name is
            "Apache Beam: An advanced unified programming model".
            What does the
            model unify, if "streaming vs. batch" is not part of the
            model?

            Using microbatching, chaining of batch jobs, or pure
            streaming are
            exactly the "runtime conditions/characteristics" I refer
            to. All these
            define several runtime parameters, which in turn define
            how well/badly
            will the pipeline perform and how many resources might
            be needed. From
            my point of view, pure streaming should be the most
            resource demanding
            (if not, why bother with batch? why not run everything
            in streaming
            only? what will there remain to "unify"?).

             > Fortunately, for batch, only the state for a single
            key needs to be
            preserved at a time, rather than the state for all keys
            across the range
            of skew. Of course if you have few or hot keys, one can
            still have
            issues (and this is not specific to StatefulDoFns).

            Yes, but here is still the presumption that my stateful
            DoFn can
            tolerate arbitrary shuffling of inputs. Let me explain
            the use case in
            more detail.

            Suppose you have input stream consisting of 1s and 0s
            (and some key for
            each element, which is irrelevant for the
            demonstration). Your task is
            to calculate in running global window the actual number
            of changes
            between state 0 and state 1 and vice versa. When the
            state doesn't
            change, you don't calculate anything. If input (for
            given key) would be
            (tN denotes timestamp N):

              t1: 1

              t2: 0

              t3: 0

              t4: 1

              t5: 1

              t6: 0

            then the output should yield (supposing that default
            state is zero):

              t1: (one: 1, zero: 0)

              t2: (one: 1, zero: 1)

              t3: (one: 1, zero: 1)

              t4: (one: 2, zero: 1)

              t5: (one: 2, zero: 1)

              t6: (one: 2, zero: 2)

            How would you implement this in current Beam semantics?

        I think your saying here that I know that my input is
        ordered in a specific way and since I assume the order when
        writing my pipeline I can perform this optimization. But
        there is nothing preventing a runner from noticing that your
        processing in the global window with a specific type of
        trigger and re-ordering your inputs/processing to get better
        performance (since you can't use an AfterWatermark trigger
        for your pipeline in streaming for the GlobalWindow).

        Today, if you must have a strict order, you must guarantee
        that your StatefulParDo implements the necessary "buffering
        & sorting" into state. I can see why you would want an
        annotation that says I must have timestamp ordered elements,
        since it makes writing certain StatefulParDos much easier.
        StatefulParDo is a low-level function, it really is the
        "here you go and do whatever you need to but here be
        dragons" function while windowing and triggering is meant to
        keep many people from writing StatefulParDo in the first place.

             > Pipelines that fail in the "worst case" batch
            scenario are likely to
            degrade poorly (possibly catastrophically) when the
            watermark falls
            behind in streaming mode as well.

            But the worst case is defined by input of size
            (available resources +
            single byte) -> pipeline fail. Although it could have
            finished, given
            the right conditions.

             > This might be reasonable, implemented by default by
            buffering
            everything and releasing elements as the watermark
            (+lateness) advances,
            but would likely lead to inefficient (though *maybe*
            easier to reason
            about) code.

            Sure, the pipeline will be less efficient, because it
            would have to
            buffer and sort the inputs. But at least it will produce
            correct results
            in cases where updates to state are order-sensitive.

             > Would it be roughly equivalent to GBK +
            FlatMap(lambda (key, values):
            [(key, value) for value in values])?

            I'd say roughly yes, but difference would be in the
            trigger. The trigger
            should ideally fire as soon as watermark (+lateness)
            crosses element
            with lowest timestamp in the buffer. Although this could
            be somehow
            emulated by fixed trigger each X millis.

             > Or is the underlying desire just to be able to hint
            to the runner
            that the code may perform better (e.g. require less
            resources) as skew
            is reduced (and hence to order by timestamp iff it's cheap)?

            No, the sorting would have to be done in streaming case
            as well. That is
            an imperative of the unified model. I think it is
            possible to sort by
            timestamp only in batch case (and do it for *all* batch
            stateful pardos
            without annotation), or introduce annotation, but then
            make the same
            guarantees for streaming case as well.

            Jan

            On 5/20/19 4:41 PM, Robert Bradshaw wrote:
            > On Mon, May 20, 2019 at 1:19 PM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:
            >> Hi Robert,
            >>
            >> yes, I think you rephrased my point - although no
            *explicit* guarantees
            >> of ordering are given in either mode, there is
            *implicit* ordering in
            >> streaming case that is due to nature of the
            processing - the difference
            >> between watermark and timestamp of elements flowing
            through the pipeline
            >> are generally low (too high difference leads to the
            overbuffering
            >> problem), but there is no such bound on batch.
            > Fortunately, for batch, only the state for a single
            key needs to be
            > preserved at a time, rather than the state for all
            keys across the
            > range of skew. Of course if you have few or hot keys,
            one can still
            > have issues (and this is not specific to StatefulDoFns).
            >
            >> As a result, I see a few possible solutions:
            >>
            >>    - the best and most natural seems to be extension
            of the model, so
            >> that it defines batch as not only "streaming pipeline
            executed in batch
            >> fashion", but "pipeline with at least as good runtime
            characteristics as
            >> in streaming case, executed in batch fashion", I
            really don't think that
            >> there are any conflicts with the current model, or
            that this could
            >> affect performance, because the required sorting (as
            pointed by
            >> Aljoscha) is very probably already done during
            translation of stateful
            >> pardos. Also note that this definition only affects
            user defined
            >> stateful pardos
            > I don't see batch vs. streaming as part of the model.
            One can have
            > microbatch, or even a runner that alternates between
            different modes.
            > The model describes what the valid outputs are given a
            (sometimes
            > partial) set of inputs. It becomes really hard to
            define things like
            > "as good runtime characteristics." Once you allow any
            > out-of-orderedness, it is not very feasible to try and
            define (and
            > more cheaply implement) a "upper bound" of acceptable
            > out-of-orderedness.
            >
            > Pipelines that fail in the "worst case" batch scenario
            are likely to
            > degrade poorly (possibly catastrophically) when the
            watermark falls
            > behind in streaming mode as well.
            >
            >>    - another option would be to introduce annotation
            for DoFns (e.g.
            >> @RequiresStableTimeCharacteristics), which would
            result in the sorting
            >> in batch case - but - this extension would have to
            ensure the sorting in
            >> streaming mode also - it would require definition of
            allowed lateness,
            >> and triggger (essentially similar to window)
            > This might be reasonable, implemented by default by
            buffering
            > everything and releasing elements as the watermark
            (+lateness)
            > advances, but would likely lead to inefficient (though
            *maybe* easier
            > to reason about) code. Not sure about the semantics of
            triggering
            > here, especially data-driven triggers. Would it be
            roughly equivalent
            > to GBK + FlatMap(lambda (key, values): [(key, value)
            for value in
            > values])?
            >
            > Or is the underlying desire just to be able to hint to
            the runner that
            > the code may perform better (e.g. require less
            resources) as skew is
            > reduced (and hence to order by timestamp iff it's cheap)?
            >
            >>    - last option would be to introduce these "higher
            order guarantees" in
            >> some extension DSL (e.g. Euphoria), but that seems to
            be the worst
            >> option to me
            >>
            >> I see the first two options quite equally good,
            although the letter one
            >> is probably more time consuming to implement. But it
            would bring
            >> additional feature to streaming case as well.
            >>
            >> Thanks for any thoughts.
            >>
            >>    Jan
            >>
            >> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
            >>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:
            >>>> Hi Reuven,
            >>>>
            >>>>> How so? AFAIK stateful DoFns work just fine in
            batch runners.
            >>>> Stateful ParDo works in batch as far, as the logic
            inside the state works for absolutely unbounded
            out-of-orderness of elements. That basically
            (practically) can work only for cases, where the order
            of input elements doesn't matter. But, "state" can refer
            to "state machine", and any time you have a state
            machine involved, then the ordering of elements would
            matter.
            >>> No guarantees on order are provided in *either*
            streaming or batch
            >>> mode by the model. However, it is the case that in
            order to make
            >>> forward progress most streaming runners attempt to
            limit the amount of
            >>> out-of-orderedness of elements (in terms of event
            time vs. processing
            >>> time) to make forward progress, which in turn could
            help cap the
            >>> amount of state that must be held concurrently,
            whereas a batch runner
            >>> may not allow any state to be safely discarded until
            the whole
            >>> timeline from infinite past to infinite future has
            been observed.
            >>>
            >>> Also, as pointed out, state is not preserved "batch
            to batch" in batch mode.
            >>>
            >>>
            >>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels
            <[email protected] <mailto:[email protected]>> wrote:
            >>>
            >>>>>    batch semantics and streaming semantics differs
            only in that I can have GlobalWindow with default
            trigger on batch and cannot on stream
            >>>> You can have a GlobalWindow in streaming with a
            default trigger. You
            >>>> could define additional triggers that do early
            firings. And you could
            >>>> even trigger the global window by advancing the
            watermark to +inf.
            >>> IIRC, as a pragmatic note, we prohibited global
            window with default
            >>> trigger on unbounded PCollections in the SDK because
            this is more
            >>> likely to be user error than an actual desire to
            have no output until
            >>> drain. But it's semantically valid in the model.

Reply via email to