Hi Kenn,

I'll quote here all the recent comments (from this and the (closed) thread [1]):

> Your proposed feature is sensitive to all data that is not in timestamp order, which is not the same as late. In Beam "late" is defined as "assigned to a window where the watermark has passed the end of the window and a 'final' aggregate has been produced". Your proposal is not really sensitive to this form of late data.

> I think there is some published work that will help you particularly in addressing out-of-order data. Note that this is not the normal notion of late. . Trill has a high-watermark driven sorting buffer prior to sending elements in order to stateful operators. It is similar to your sketched algorithm for emitting elements as the watermark passes. I believe Gearpump also uses a sorting buffer and processes in order, and we do have a Gearpump runner still here in our repo.

> This is a pre-watermark, pre-Beam approach to processing data. It drops more data and/or introduces more latency, but can lead to simpler or more efficient operator implementations (but not always). > I do think it seems OK to add this to Beam in some form as a convenience when the user knows something about their data and/or DoFn. The risk of course is that users go for this when they shouldn't, thinking it is simpler without considering the complexity of how to avoid dropping too much data.

All these seem related. Let me try to explain. I'll try to keep this as short as possible, but because the topic is not trivial and possibly touches many related problems I might fail doing so. Please bear with me. :)

I'm aware of the difference between out-of-order data and late data. I don't think that the (generally) described scheme "unordered stream -> buffer -> ordered stream" has anything to do with how model (or runner) handles time progress. This is general idea and the respective properties of the this computation scheme depends on how we define condition when the buffer gets flushed (this can - and should be for the reasons you mention - be driven by watermark progress). The PR [2] already works on watermark progress (because it is driven by event-time timers). If I'm not missing something, then this approach seems to be very much aligned with "current Beam" approach - please correct me if I'm wrong.

There are two main open issues with the currently proposed approach (that I'm aware of). These are:  a) as opposed to the original proposal, the current implementation does not enable a UDF for extraction of some fine-grained time to be used as sorting criterion (e.g. sequential IDs that might be present in data)  b) it doesn't handle allowed lateness correctly (this might probably be what you are referring to)

Issue a) is sort of trivial, it is just about how to do this in a most compatible way in the rest of Beam APIs. Moreover, this can be trivially postponed and solved later, so I would ignore it for now.

Issue b) is much worse. The current approach is to force allowed lateness to zero, dropping all late elements. I'm not fine with that (and maybe as part of a review it might be concluded that a better approach would be to actually wait until watermark reaches T + allowedLateness before flushing the buffer, being consistent with dropping only really late elements, but introducing more latency). There is a 'correct' solution to this, though. That would be (very roughly outlined):
 a) create single input buffer, let's call this "input buffer"
 b) create two copies of the stateful DoFn consuming the sorted stream (scope all created states and timers with an additional dimension - one is "on time" and the other "late" - that is state and timers would reside in namespace (k, v, "on time" | "late") - let's call these two respective DoFns "on time" and "late"
 c) on each watermark update, do the following:
  i) take all elements with timestamp less than watermark, sort them and flush them out to the consuming "on time" stateful DoFn, _storing all output produced by the DoFn into separate state (let's call it "output buffer"), passing it downstream as well_   ii) take all elements with timestamp less than watermak - allowedLateness, sort them and pass them to consuming "late" stateful DoFn, _storing all results into "late output buffer", not passing anything into output_   iii) compare "output buffer" with "late output buffer" and *retract data elements that were part output buffer, but were not present in late buffer, and output data that were not in output buffer, but were present in late output buffer*   iv) trim "output buffer", "late output buffer" and "input buffer" to contain only elements with timestamp not preceding watermark - allowedLateness

I have (currently) a strong belief that this is actually can serve as a base of fully generic solution to retraction problem (it would only require to actually handle all stateful operations as stateful DoFns when there are retractions enabled in the pipeline), with one piece missing - the retractions would have to propagate to outputs as well. That might still be tricky. Moreover, there are some rough edges, like the late data trigger might be needed to happen with every data late element (similarly to default trigger) and that would be more complicated.

Anyway, because Beam currently doesn't support retractions (but this sorting approach might help a lot!) and I generally like small baby steps towards fully general solutions I'm proposing the first iteration of this to either force allowedLateness to zero, or hold the outputs back until watermark passes the allowedLateness. These two approaches might be discussed as part of the PR review process. Nevertheless, I'd like to get to the fully working solution in the future.

Hope this was somehow understandable, I was trying to simplify things as much as possible, why still being able to catch the essence.

Jan

[1] https://lists.apache.org/thread.html/6d2ea0cc0b5bbc5286d32442b852b25262b834be405956af7d5efac2@%3Cdev.beam.apache.org%3E

[2] https://github.com/apache/beam/pull/8774

On 11/15/19 7:01 AM, Kenneth Knowles wrote:


On Tue, Nov 12, 2019 at 1:36 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi,

    this is follow up of multiple threads covering the topic of how to
    (in a
    unified way) process event streams. Event streams can be
    characterized
    by a common property that ordering of events matter.


1. events are ordered (hence timestamps)
2. most operators do not depend on order / operators that depend on some order do not depend on the total order 3. real-world data is inherently likely to have a distribution of disorder that has an unboundedly long tail

    The processing
    (usually) looks something like

       unordered stream -> buffer (per key) -> ordered stream -> stateful
    logic (DoFn)


This is a pre-watermark, pre-Beam approach to processing data. It drops more data and/or introduces more latency, but can lead to simpler or more efficient operator implementations (but not always).

I do think it seems OK to add this to Beam in some form as a convenience when the user knows something about their data and/or DoFn. The risk of course is that users go for this when they shouldn't, thinking it is simpler without considering the complexity of how to avoid dropping too much data.

This thread seems to be a continuation of the other thread I just responded to. It would be good to try to keep them tied together to avoid duplicate responses.

Kenn

    This is perfectly fine and can be solved by current tools Beam offers
    (state & timers), but *only for streaming case*. The batch case is
    essentially broken, because:

      a) out-of-orderness is essentially *unbounded* (as opposed to input
    being bounded, strangely, that is not a contradiction),
    out-of-orderness
    in streaming case is *bounded*, because the watermark can fall behind
    only limit amount of time (sooner or later, nobody would actually
    care
    about results from streaming pipeline being months or years late,
    right?)

      b) with unbounded out-of-orderness, the spatial requirements of
    state
    grow with O(N), worst case, where N is size of the whole input

      c) moreover, many runners restrict the size of state per key to
    fit in
    memory (spark, flink)

    Now, solutions to this problems seem to be:

      1) refine the model guarantees for batch stateful processing, so
    that
    we limit the out-of-orderness (the source of issues here) - the only
    reasonable way to do that is to enforce sorting before all stateful
    dofns in batch case (perhaps there might opt-out for that), or

      2) define a way to mark stateful dofn as requiring the sorting
    (e.g.
    @RequiresTimeSortedInput) - note this has to be done for both
    batch and
    streaming case, as opposed to 1), or

      3) define a different URN for "ordered stateful dofn", with default
    expansion using state as buffer (for both batch and streaming case) -
    that way this can be overridden in batch runners that can get into
    trouble otherwise (and could be regarded as sort of natural
    extension of
    the current approach).

    I still think that the best solution is 1), for multiple reasons
    going
    from being internally logically consistent to being practical and
    easily
    implemented (a few lines of code in flink's case for instance). On
    the
    other hand, if this is really not what we want to do, then I'd
    like to
    know the community's opinion on the two other options (or, if there
    maybe is some other option I didn't cover).

    Many thanks for opinions and help with fixing what is (sort of)
    broken
    right now.

    Jan

Reply via email to