Hi,

this is follow up of multiple threads covering the topic of how to (in a unified way) process event streams. Event streams can be characterized by a common property that ordering of events matter. The processing (usually) looks something like

  unordered stream -> buffer (per key) -> ordered stream -> stateful logic (DoFn)

This is perfectly fine and can be solved by current tools Beam offers (state & timers), but *only for streaming case*. The batch case is essentially broken, because:

 a) out-of-orderness is essentially *unbounded* (as opposed to input being bounded, strangely, that is not a contradiction), out-of-orderness in streaming case is *bounded*, because the watermark can fall behind only limit amount of time (sooner or later, nobody would actually care about results from streaming pipeline being months or years late, right?)

 b) with unbounded out-of-orderness, the spatial requirements of state grow with O(N), worst case, where N is size of the whole input

 c) moreover, many runners restrict the size of state per key to fit in memory (spark, flink)

Now, solutions to this problems seem to be:

 1) refine the model guarantees for batch stateful processing, so that we limit the out-of-orderness (the source of issues here) - the only reasonable way to do that is to enforce sorting before all stateful dofns in batch case (perhaps there might opt-out for that), or

 2) define a way to mark stateful dofn as requiring the sorting (e.g. @RequiresTimeSortedInput) - note this has to be done for both batch and streaming case, as opposed to 1), or

 3) define a different URN for "ordered stateful dofn", with default expansion using state as buffer (for both batch and streaming case) - that way this can be overridden in batch runners that can get into trouble otherwise (and could be regarded as sort of natural extension of the current approach).

I still think that the best solution is 1), for multiple reasons going from being internally logically consistent to being practical and easily implemented (a few lines of code in flink's case for instance). On the other hand, if this is really not what we want to do, then I'd like to know the community's opinion on the two other options (or, if there maybe is some other option I didn't cover).

Many thanks for opinions and help with fixing what is (sort of) broken right now.

Jan

Reply via email to