On 5/20/19 1:39 PM, Reuven Lax wrote:


On Mon, May 20, 2019 at 4:19 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi Robert,

    yes, I think you rephrased my point - although no *explicit*
    guarantees
    of ordering are given in either mode, there is *implicit* ordering in
    streaming case that is due to nature of the processing - the
    difference
    between watermark and timestamp of elements flowing through the
    pipeline
    are generally low (too high difference leads to the overbuffering
    problem), but there is no such bound on batch.


Often not. I've seen many cases where a streaming pipeline falls behind by hours or days (usually because of external problems such as service outages). This is where watermark semantics are needed the most, to make sure that the output is still correct while the pipeline catches up. While it's true that in the happy case a streaming pipeline is getting all records close to real time and the out-of-orderness is bounded, we should design semantics that extend to the unhappy case as well, as the real world is adept at giving us such scenarios.

Absolutely true. But really not in conflict with my proposals. Although the watermark might fall really far behind real time, there are two essential properties that mitigate this problem:

 a) the watermark starts moving before buffers are exhausted, or

 b) the pipeline fails - but due to checkpoint is restored into state where it can start running again and eventually satisfies condition a)

On the other hand, once you get to state b) in batch case, you will probably never leave it (no matter how often you restart your pipeline, the only solution is to add more resources).



    As a result, I see a few possible solutions:

      - the best and most natural seems to be extension of the model, so
    that it defines batch as not only "streaming pipeline executed in
    batch
    fashion", but "pipeline with at least as good runtime
    characteristics as
    in streaming case, executed in batch fashion", I really don't
    think that
    there are any conflicts with the current model, or that this could
    affect performance, because the required sorting (as pointed by
    Aljoscha) is very probably already done during translation of
    stateful
    pardos. Also note that this definition only affects user defined
    stateful pardos

      - another option would be to introduce annotation for DoFns (e.g.
    @RequiresStableTimeCharacteristics), which would result in the
    sorting
    in batch case - but - this extension would have to ensure the
    sorting in
    streaming mode also - it would require definition of allowed
    lateness,
    and triggger (essentially similar to window)

      - last option would be to introduce these "higher order
    guarantees" in
    some extension DSL (e.g. Euphoria), but that seems to be the worst
    option to me

    I see the first two options quite equally good, although the
    letter one
    is probably more time consuming to implement. But it would bring
    additional feature to streaming case as well.

    Thanks for any thoughts.

      Jan

    On 5/20/19 12:41 PM, Robert Bradshaw wrote:
    > On Fri, May 17, 2019 at 4:48 PM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:
    >> Hi Reuven,
    >>
    >>> How so? AFAIK stateful DoFns work just fine in batch runners.
    >> Stateful ParDo works in batch as far, as the logic inside the
    state works for absolutely unbounded out-of-orderness of elements.
    That basically (practically) can work only for cases, where the
    order of input elements doesn't matter. But, "state" can refer to
    "state machine", and any time you have a state machine involved,
    then the ordering of elements would matter.
    > No guarantees on order are provided in *either* streaming or batch
    > mode by the model. However, it is the case that in order to make
    > forward progress most streaming runners attempt to limit the
    amount of
    > out-of-orderedness of elements (in terms of event time vs.
    processing
    > time) to make forward progress, which in turn could help cap the
    > amount of state that must be held concurrently, whereas a batch
    runner
    > may not allow any state to be safely discarded until the whole
    > timeline from infinite past to infinite future has been observed.
    >
    > Also, as pointed out, state is not preserved "batch to batch" in
    batch mode.
    >
    >
    > On Thu, May 16, 2019 at 3:59 PM Maximilian Michels
    <m...@apache.org <mailto:m...@apache.org>> wrote:
    >
    >>>   batch semantics and streaming semantics differs only in that
    I can have GlobalWindow with default trigger on batch and cannot
    on stream
    >> You can have a GlobalWindow in streaming with a default
    trigger. You
    >> could define additional triggers that do early firings. And you
    could
    >> even trigger the global window by advancing the watermark to +inf.
    > IIRC, as a pragmatic note, we prohibited global window with default
    > trigger on unbounded PCollections in the SDK because this is more
    > likely to be user error than an actual desire to have no output
    until
    > drain. But it's semantically valid in the model.

Reply via email to