Hi Lukasz,

> Today, if you must have a strict order, you must guarantee that your StatefulParDo implements the necessary "buffering & sorting" into state.

Yes, no problem with that. But this whole discussion started, because *this doesn't work on batch*. You simply cannot first read everything from distributed storage and then buffer it all into memory, just to read it again, but sorted. That will not work. And even if it would, it would be a terrible waste of resources.

Jan

On 5/20/19 8:39 PM, Lukasz Cwik wrote:


On Mon, May 20, 2019 at 8:24 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    This discussion brings many really interesting questions for me. :-)

     > I don't see batch vs. streaming as part of the model. One can have
    microbatch, or even a runner that alternates between different modes.

    Although I understand motivation of this statement, this project
    name is
    "Apache Beam: An advanced unified programming model". What does the
    model unify, if "streaming vs. batch" is not part of the model?

    Using microbatching, chaining of batch jobs, or pure streaming are
    exactly the "runtime conditions/characteristics" I refer to. All
    these
    define several runtime parameters, which in turn define how
    well/badly
    will the pipeline perform and how many resources might be needed.
    From
    my point of view, pure streaming should be the most resource
    demanding
    (if not, why bother with batch? why not run everything in streaming
    only? what will there remain to "unify"?).

     > Fortunately, for batch, only the state for a single key needs
    to be
    preserved at a time, rather than the state for all keys across the
    range
    of skew. Of course if you have few or hot keys, one can still have
    issues (and this is not specific to StatefulDoFns).

    Yes, but here is still the presumption that my stateful DoFn can
    tolerate arbitrary shuffling of inputs. Let me explain the use
    case in
    more detail.

    Suppose you have input stream consisting of 1s and 0s (and some
    key for
    each element, which is irrelevant for the demonstration). Your
    task is
    to calculate in running global window the actual number of changes
    between state 0 and state 1 and vice versa. When the state doesn't
    change, you don't calculate anything. If input (for given key)
    would be
    (tN denotes timestamp N):

      t1: 1

      t2: 0

      t3: 0

      t4: 1

      t5: 1

      t6: 0

    then the output should yield (supposing that default state is zero):

      t1: (one: 1, zero: 0)

      t2: (one: 1, zero: 1)

      t3: (one: 1, zero: 1)

      t4: (one: 2, zero: 1)

      t5: (one: 2, zero: 1)

      t6: (one: 2, zero: 2)

    How would you implement this in current Beam semantics?

I think your saying here that I know that my input is ordered in a specific way and since I assume the order when writing my pipeline I can perform this optimization. But there is nothing preventing a runner from noticing that your processing in the global window with a specific type of trigger and re-ordering your inputs/processing to get better performance (since you can't use an AfterWatermark trigger for your pipeline in streaming for the GlobalWindow).

Today, if you must have a strict order, you must guarantee that your StatefulParDo implements the necessary "buffering & sorting" into state. I can see why you would want an annotation that says I must have timestamp ordered elements, since it makes writing certain StatefulParDos much easier. StatefulParDo is a low-level function, it really is the "here you go and do whatever you need to but here be dragons" function while windowing and triggering is meant to keep many people from writing StatefulParDo in the first place.

     > Pipelines that fail in the "worst case" batch scenario are
    likely to
    degrade poorly (possibly catastrophically) when the watermark falls
    behind in streaming mode as well.

    But the worst case is defined by input of size (available resources +
    single byte) -> pipeline fail. Although it could have finished, given
    the right conditions.

     > This might be reasonable, implemented by default by buffering
    everything and releasing elements as the watermark (+lateness)
    advances,
    but would likely lead to inefficient (though *maybe* easier to reason
    about) code.

    Sure, the pipeline will be less efficient, because it would have to
    buffer and sort the inputs. But at least it will produce correct
    results
    in cases where updates to state are order-sensitive.

     > Would it be roughly equivalent to GBK + FlatMap(lambda (key,
    values):
    [(key, value) for value in values])?

    I'd say roughly yes, but difference would be in the trigger. The
    trigger
    should ideally fire as soon as watermark (+lateness) crosses element
    with lowest timestamp in the buffer. Although this could be somehow
    emulated by fixed trigger each X millis.

     > Or is the underlying desire just to be able to hint to the runner
    that the code may perform better (e.g. require less resources) as
    skew
    is reduced (and hence to order by timestamp iff it's cheap)?

    No, the sorting would have to be done in streaming case as well.
    That is
    an imperative of the unified model. I think it is possible to sort by
    timestamp only in batch case (and do it for *all* batch stateful
    pardos
    without annotation), or introduce annotation, but then make the same
    guarantees for streaming case as well.

    Jan

    On 5/20/19 4:41 PM, Robert Bradshaw wrote:
    > On Mon, May 20, 2019 at 1:19 PM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:
    >> Hi Robert,
    >>
    >> yes, I think you rephrased my point - although no *explicit*
    guarantees
    >> of ordering are given in either mode, there is *implicit*
    ordering in
    >> streaming case that is due to nature of the processing - the
    difference
    >> between watermark and timestamp of elements flowing through the
    pipeline
    >> are generally low (too high difference leads to the overbuffering
    >> problem), but there is no such bound on batch.
    > Fortunately, for batch, only the state for a single key needs to be
    > preserved at a time, rather than the state for all keys across the
    > range of skew. Of course if you have few or hot keys, one can still
    > have issues (and this is not specific to StatefulDoFns).
    >
    >> As a result, I see a few possible solutions:
    >>
    >>    - the best and most natural seems to be extension of the
    model, so
    >> that it defines batch as not only "streaming pipeline executed
    in batch
    >> fashion", but "pipeline with at least as good runtime
    characteristics as
    >> in streaming case, executed in batch fashion", I really don't
    think that
    >> there are any conflicts with the current model, or that this could
    >> affect performance, because the required sorting (as pointed by
    >> Aljoscha) is very probably already done during translation of
    stateful
    >> pardos. Also note that this definition only affects user defined
    >> stateful pardos
    > I don't see batch vs. streaming as part of the model. One can have
    > microbatch, or even a runner that alternates between different
    modes.
    > The model describes what the valid outputs are given a (sometimes
    > partial) set of inputs. It becomes really hard to define things like
    > "as good runtime characteristics." Once you allow any
    > out-of-orderedness, it is not very feasible to try and define (and
    > more cheaply implement) a "upper bound" of acceptable
    > out-of-orderedness.
    >
    > Pipelines that fail in the "worst case" batch scenario are likely to
    > degrade poorly (possibly catastrophically) when the watermark falls
    > behind in streaming mode as well.
    >
    >>    - another option would be to introduce annotation for DoFns
    (e.g.
    >> @RequiresStableTimeCharacteristics), which would result in the
    sorting
    >> in batch case - but - this extension would have to ensure the
    sorting in
    >> streaming mode also - it would require definition of allowed
    lateness,
    >> and triggger (essentially similar to window)
    > This might be reasonable, implemented by default by buffering
    > everything and releasing elements as the watermark (+lateness)
    > advances, but would likely lead to inefficient (though *maybe*
    easier
    > to reason about) code. Not sure about the semantics of triggering
    > here, especially data-driven triggers. Would it be roughly
    equivalent
    > to GBK + FlatMap(lambda (key, values): [(key, value) for value in
    > values])?
    >
    > Or is the underlying desire just to be able to hint to the
    runner that
    > the code may perform better (e.g. require less resources) as skew is
    > reduced (and hence to order by timestamp iff it's cheap)?
    >
    >>    - last option would be to introduce these "higher order
    guarantees" in
    >> some extension DSL (e.g. Euphoria), but that seems to be the worst
    >> option to me
    >>
    >> I see the first two options quite equally good, although the
    letter one
    >> is probably more time consuming to implement. But it would bring
    >> additional feature to streaming case as well.
    >>
    >> Thanks for any thoughts.
    >>
    >>    Jan
    >>
    >> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
    >>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:
    >>>> Hi Reuven,
    >>>>
    >>>>> How so? AFAIK stateful DoFns work just fine in batch runners.
    >>>> Stateful ParDo works in batch as far, as the logic inside the
    state works for absolutely unbounded out-of-orderness of elements.
    That basically (practically) can work only for cases, where the
    order of input elements doesn't matter. But, "state" can refer to
    "state machine", and any time you have a state machine involved,
    then the ordering of elements would matter.
    >>> No guarantees on order are provided in *either* streaming or batch
    >>> mode by the model. However, it is the case that in order to make
    >>> forward progress most streaming runners attempt to limit the
    amount of
    >>> out-of-orderedness of elements (in terms of event time vs.
    processing
    >>> time) to make forward progress, which in turn could help cap the
    >>> amount of state that must be held concurrently, whereas a
    batch runner
    >>> may not allow any state to be safely discarded until the whole
    >>> timeline from infinite past to infinite future has been observed.
    >>>
    >>> Also, as pointed out, state is not preserved "batch to batch"
    in batch mode.
    >>>
    >>>
    >>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels
    <m...@apache.org <mailto:m...@apache.org>> wrote:
    >>>
    >>>>>    batch semantics and streaming semantics differs only in
    that I can have GlobalWindow with default trigger on batch and
    cannot on stream
    >>>> You can have a GlobalWindow in streaming with a default
    trigger. You
    >>>> could define additional triggers that do early firings. And
    you could
    >>>> even trigger the global window by advancing the watermark to
    +inf.
    >>> IIRC, as a pragmatic note, we prohibited global window with
    default
    >>> trigger on unbounded PCollections in the SDK because this is more
    >>> likely to be user error than an actual desire to have no
    output until
    >>> drain. But it's semantically valid in the model.

Reply via email to