On Tue, Nov 12, 2019 at 1:36 AM Jan Lukavský <[email protected]> wrote:
> Hi, > > this is follow up of multiple threads covering the topic of how to (in a > unified way) process event streams. Event streams can be characterized > by a common property that ordering of events matter. 1. events are ordered (hence timestamps) 2. most operators do not depend on order / operators that depend on some order do not depend on the total order 3. real-world data is inherently likely to have a distribution of disorder that has an unboundedly long tail > The processing > (usually) looks something like > > unordered stream -> buffer (per key) -> ordered stream -> stateful > logic (DoFn) > This is a pre-watermark, pre-Beam approach to processing data. It drops more data and/or introduces more latency, but can lead to simpler or more efficient operator implementations (but not always). I do think it seems OK to add this to Beam in some form as a convenience when the user knows something about their data and/or DoFn. The risk of course is that users go for this when they shouldn't, thinking it is simpler without considering the complexity of how to avoid dropping too much data. This thread seems to be a continuation of the other thread I just responded to. It would be good to try to keep them tied together to avoid duplicate responses. Kenn This is perfectly fine and can be solved by current tools Beam offers > (state & timers), but *only for streaming case*. The batch case is > essentially broken, because: > > a) out-of-orderness is essentially *unbounded* (as opposed to input > being bounded, strangely, that is not a contradiction), out-of-orderness > in streaming case is *bounded*, because the watermark can fall behind > only limit amount of time (sooner or later, nobody would actually care > about results from streaming pipeline being months or years late, right?) > > b) with unbounded out-of-orderness, the spatial requirements of state > grow with O(N), worst case, where N is size of the whole input > > c) moreover, many runners restrict the size of state per key to fit in > memory (spark, flink) > > Now, solutions to this problems seem to be: > > 1) refine the model guarantees for batch stateful processing, so that > we limit the out-of-orderness (the source of issues here) - the only > reasonable way to do that is to enforce sorting before all stateful > dofns in batch case (perhaps there might opt-out for that), or > > 2) define a way to mark stateful dofn as requiring the sorting (e.g. > @RequiresTimeSortedInput) - note this has to be done for both batch and > streaming case, as opposed to 1), or > > 3) define a different URN for "ordered stateful dofn", with default > expansion using state as buffer (for both batch and streaming case) - > that way this can be overridden in batch runners that can get into > trouble otherwise (and could be regarded as sort of natural extension of > the current approach). > > I still think that the best solution is 1), for multiple reasons going > from being internally logically consistent to being practical and easily > implemented (a few lines of code in flink's case for instance). On the > other hand, if this is really not what we want to do, then I'd like to > know the community's opinion on the two other options (or, if there > maybe is some other option I didn't cover). > > Many thanks for opinions and help with fixing what is (sort of) broken > right now. > > Jan > >
