One concern with (1) is that it may not be cheap to do for all
runners. There also seems to be the implication that in batch elements
would be 100% in order but in streaming kind-of-in-order is OK, which
would lead to pipelines being developed/tested against stronger
guarantees than are generally provided in a streaming system. It also
means batch and streaming have different semantics, not just different
runtime characteristics, etc. (Note also that for streaming the
out-of-order limits are essentially unbounded as well, but if you fall
"too far" behind you generally have other problems so in practice it's
OK for a "healthy" pipeline.)

I think (2) is the most consistent, as we can't meaningfully limit the
amount of unboundedness to say a particular runner (or mode) has
violated it.

On Tue, Nov 12, 2019 at 1:36 AM Jan Lukavský <[email protected]> wrote:
>
> Hi,
>
> this is follow up of multiple threads covering the topic of how to (in a
> unified way) process event streams. Event streams can be characterized
> by a common property that ordering of events matter. The processing
> (usually) looks something like
>
>    unordered stream -> buffer (per key) -> ordered stream -> stateful
> logic (DoFn)
>
> This is perfectly fine and can be solved by current tools Beam offers
> (state & timers), but *only for streaming case*. The batch case is
> essentially broken, because:
>
>   a) out-of-orderness is essentially *unbounded* (as opposed to input
> being bounded, strangely, that is not a contradiction), out-of-orderness
> in streaming case is *bounded*, because the watermark can fall behind
> only limit amount of time (sooner or later, nobody would actually care
> about results from streaming pipeline being months or years late, right?)
>
>   b) with unbounded out-of-orderness, the spatial requirements of state
> grow with O(N), worst case, where N is size of the whole input
>
>   c) moreover, many runners restrict the size of state per key to fit in
> memory (spark, flink)
>
> Now, solutions to this problems seem to be:
>
>   1) refine the model guarantees for batch stateful processing, so that
> we limit the out-of-orderness (the source of issues here) - the only
> reasonable way to do that is to enforce sorting before all stateful
> dofns in batch case (perhaps there might opt-out for that), or
>
>   2) define a way to mark stateful dofn as requiring the sorting (e.g.
> @RequiresTimeSortedInput) - note this has to be done for both batch and
> streaming case, as opposed to 1), or
>
>   3) define a different URN for "ordered stateful dofn", with default
> expansion using state as buffer (for both batch and streaming case) -
> that way this can be overridden in batch runners that can get into
> trouble otherwise (and could be regarded as sort of natural extension of
> the current approach).
>
> I still think that the best solution is 1), for multiple reasons going
> from being internally logically consistent to being practical and easily
> implemented (a few lines of code in flink's case for instance). On the
> other hand, if this is really not what we want to do, then I'd like to
> know the community's opinion on the two other options (or, if there
> maybe is some other option I didn't cover).
>
> Many thanks for opinions and help with fixing what is (sort of) broken
> right now.
>
> Jan
>

Reply via email to