One concern with (1) is that it may not be cheap to do for all runners. There also seems to be the implication that in batch elements would be 100% in order but in streaming kind-of-in-order is OK, which would lead to pipelines being developed/tested against stronger guarantees than are generally provided in a streaming system. It also means batch and streaming have different semantics, not just different runtime characteristics, etc. (Note also that for streaming the out-of-order limits are essentially unbounded as well, but if you fall "too far" behind you generally have other problems so in practice it's OK for a "healthy" pipeline.)
I think (2) is the most consistent, as we can't meaningfully limit the amount of unboundedness to say a particular runner (or mode) has violated it. On Tue, Nov 12, 2019 at 1:36 AM Jan Lukavský <[email protected]> wrote: > > Hi, > > this is follow up of multiple threads covering the topic of how to (in a > unified way) process event streams. Event streams can be characterized > by a common property that ordering of events matter. The processing > (usually) looks something like > > unordered stream -> buffer (per key) -> ordered stream -> stateful > logic (DoFn) > > This is perfectly fine and can be solved by current tools Beam offers > (state & timers), but *only for streaming case*. The batch case is > essentially broken, because: > > a) out-of-orderness is essentially *unbounded* (as opposed to input > being bounded, strangely, that is not a contradiction), out-of-orderness > in streaming case is *bounded*, because the watermark can fall behind > only limit amount of time (sooner or later, nobody would actually care > about results from streaming pipeline being months or years late, right?) > > b) with unbounded out-of-orderness, the spatial requirements of state > grow with O(N), worst case, where N is size of the whole input > > c) moreover, many runners restrict the size of state per key to fit in > memory (spark, flink) > > Now, solutions to this problems seem to be: > > 1) refine the model guarantees for batch stateful processing, so that > we limit the out-of-orderness (the source of issues here) - the only > reasonable way to do that is to enforce sorting before all stateful > dofns in batch case (perhaps there might opt-out for that), or > > 2) define a way to mark stateful dofn as requiring the sorting (e.g. > @RequiresTimeSortedInput) - note this has to be done for both batch and > streaming case, as opposed to 1), or > > 3) define a different URN for "ordered stateful dofn", with default > expansion using state as buffer (for both batch and streaming case) - > that way this can be overridden in batch runners that can get into > trouble otherwise (and could be regarded as sort of natural extension of > the current approach). > > I still think that the best solution is 1), for multiple reasons going > from being internally logically consistent to being practical and easily > implemented (a few lines of code in flink's case for instance). On the > other hand, if this is really not what we want to do, then I'd like to > know the community's opinion on the two other options (or, if there > maybe is some other option I didn't cover). > > Many thanks for opinions and help with fixing what is (sort of) broken > right now. > > Jan >
