Hi Robert,

yes, I think you rephrased my point - although no *explicit* guarantees of ordering are given in either mode, there is *implicit* ordering in streaming case that is due to nature of the processing - the difference between watermark and timestamp of elements flowing through the pipeline are generally low (too high difference leads to the overbuffering problem), but there is no such bound on batch.

As a result, I see a few possible solutions:

 - the best and most natural seems to be extension of the model, so that it defines batch as not only "streaming pipeline executed in batch fashion", but "pipeline with at least as good runtime characteristics as in streaming case, executed in batch fashion", I really don't think that there are any conflicts with the current model, or that this could affect performance, because the required sorting (as pointed by Aljoscha) is very probably already done during translation of stateful pardos. Also note that this definition only affects user defined stateful pardos

 - another option would be to introduce annotation for DoFns (e.g. @RequiresStableTimeCharacteristics), which would result in the sorting in batch case - but - this extension would have to ensure the sorting in streaming mode also - it would require definition of allowed lateness, and triggger (essentially similar to window)

 - last option would be to introduce these "higher order guarantees" in some extension DSL (e.g. Euphoria), but that seems to be the worst option to me

I see the first two options quite equally good, although the letter one is probably more time consuming to implement. But it would bring additional feature to streaming case as well.

Thanks for any thoughts.

 Jan

On 5/20/19 12:41 PM, Robert Bradshaw wrote:
On Fri, May 17, 2019 at 4:48 PM Jan Lukavský <je...@seznam.cz> wrote:
Hi Reuven,

How so? AFAIK stateful DoFns work just fine in batch runners.
Stateful ParDo works in batch as far, as the logic inside the state works for absolutely unbounded 
out-of-orderness of elements. That basically (practically) can work only for cases, where the order 
of input elements doesn't matter. But, "state" can refer to "state machine", 
and any time you have a state machine involved, then the ordering of elements would matter.
No guarantees on order are provided in *either* streaming or batch
mode by the model. However, it is the case that in order to make
forward progress most streaming runners attempt to limit the amount of
out-of-orderedness of elements (in terms of event time vs. processing
time) to make forward progress, which in turn could help cap the
amount of state that must be held concurrently, whereas a batch runner
may not allow any state to be safely discarded until the whole
timeline from infinite past to infinite future has been observed.

Also, as pointed out, state is not preserved "batch to batch" in batch mode.


On Thu, May 16, 2019 at 3:59 PM Maximilian Michels <m...@apache.org> wrote:

  batch semantics and streaming semantics differs only in that I can have 
GlobalWindow with default trigger on batch and cannot on stream
You can have a GlobalWindow in streaming with a default trigger. You
could define additional triggers that do early firings. And you could
even trigger the global window by advancing the watermark to +inf.
IIRC, as a pragmatic note, we prohibited global window with default
trigger on unbounded PCollections in the SDK because this is more
likely to be user error than an actual desire to have no output until
drain. But it's semantically valid in the model.

Reply via email to