Hi Kenn,
I'll quote here all the recent comments (from this and the (closed)
thread [1]):
> Your proposed feature is sensitive to all data that is not in
timestamp order, which is not the same as late. In Beam "late" is
defined as "assigned to a window where the watermark has passed the end
of the window and a 'final' aggregate has been produced". Your proposal
is not really sensitive to this form of late data.
> I think there is some published work that will help you particularly
in addressing out-of-order data. Note that this is not the normal notion
of late. . Trill has a high-watermark driven sorting buffer prior to
sending elements in order to stateful operators. It is similar to your
sketched algorithm for emitting elements as the watermark passes. I
believe Gearpump also uses a sorting buffer and processes in order, and
we do have a Gearpump runner still here in our repo.
> This is a pre-watermark, pre-Beam approach to processing data. It
drops more data and/or introduces more latency, but can lead to simpler
or more efficient operator implementations (but not always).
> I do think it seems OK to add this to Beam in some form as a
convenience when the user knows something about their data and/or DoFn.
The risk of course is that users go for this when they shouldn't,
thinking it is simpler without considering the complexity of how to
avoid dropping too much data.
All these seem related. Let me try to explain. I'll try to keep this as
short as possible, but because the topic is not trivial and possibly
touches many related problems I might fail doing so. Please bear with me. :)
I'm aware of the difference between out-of-order data and late data. I
don't think that the (generally) described scheme "unordered stream ->
buffer -> ordered stream" has anything to do with how model (or runner)
handles time progress. This is general idea and the respective
properties of the this computation scheme depends on how we define
condition when the buffer gets flushed (this can - and should be for the
reasons you mention - be driven by watermark progress). The PR [2]
already works on watermark progress (because it is driven by event-time
timers). If I'm not missing something, then this approach seems to be
very much aligned with "current Beam" approach - please correct me if
I'm wrong.
There are two main open issues with the currently proposed approach
(that I'm aware of). These are:
a) as opposed to the original proposal, the current implementation
does not enable a UDF for extraction of some fine-grained time to be
used as sorting criterion (e.g. sequential IDs that might be present in
data)
b) it doesn't handle allowed lateness correctly (this might probably
be what you are referring to)
Issue a) is sort of trivial, it is just about how to do this in a most
compatible way in the rest of Beam APIs. Moreover, this can be trivially
postponed and solved later, so I would ignore it for now.
Issue b) is much worse. The current approach is to force allowed
lateness to zero, dropping all late elements. I'm not fine with that
(and maybe as part of a review it might be concluded that a better
approach would be to actually wait until watermark reaches T +
allowedLateness before flushing the buffer, being consistent with
dropping only really late elements, but introducing more latency). There
is a 'correct' solution to this, though. That would be (very roughly
outlined):
a) create single input buffer, let's call this "input buffer"
b) create two copies of the stateful DoFn consuming the sorted stream
(scope all created states and timers with an additional dimension - one
is "on time" and the other "late" - that is state and timers would
reside in namespace (k, v, "on time" | "late") - let's call these two
respective DoFns "on time" and "late"
c) on each watermark update, do the following:
i) take all elements with timestamp less than watermark, sort them
and flush them out to the consuming "on time" stateful DoFn, _storing
all output produced by the DoFn into separate state (let's call it
"output buffer"), passing it downstream as well_
ii) take all elements with timestamp less than watermak -
allowedLateness, sort them and pass them to consuming "late" stateful
DoFn, _storing all results into "late output buffer", not passing
anything into output_
iii) compare "output buffer" with "late output buffer" and *retract
data elements that were part output buffer, but were not present in late
buffer, and output data that were not in output buffer, but were present
in late output buffer*
iv) trim "output buffer", "late output buffer" and "input buffer" to
contain only elements with timestamp not preceding watermark -
allowedLateness
I have (currently) a strong belief that this is actually can serve as a
base of fully generic solution to retraction problem (it would only
require to actually handle all stateful operations as stateful DoFns
when there are retractions enabled in the pipeline), with one piece
missing - the retractions would have to propagate to outputs as well.
That might still be tricky. Moreover, there are some rough edges, like
the late data trigger might be needed to happen with every data late
element (similarly to default trigger) and that would be more complicated.
Anyway, because Beam currently doesn't support retractions (but this
sorting approach might help a lot!) and I generally like small baby
steps towards fully general solutions I'm proposing the first iteration
of this to either force allowedLateness to zero, or hold the outputs
back until watermark passes the allowedLateness. These two approaches
might be discussed as part of the PR review process. Nevertheless, I'd
like to get to the fully working solution in the future.
Hope this was somehow understandable, I was trying to simplify things as
much as possible, why still being able to catch the essence.
Jan
[1]
https://lists.apache.org/thread.html/6d2ea0cc0b5bbc5286d32442b852b25262b834be405956af7d5efac2@%3Cdev.beam.apache.org%3E
[2] https://github.com/apache/beam/pull/8774
On 11/15/19 7:01 AM, Kenneth Knowles wrote:
On Tue, Nov 12, 2019 at 1:36 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi,
this is follow up of multiple threads covering the topic of how to
(in a
unified way) process event streams. Event streams can be
characterized
by a common property that ordering of events matter.
1. events are ordered (hence timestamps)
2. most operators do not depend on order / operators that depend on
some order do not depend on the total order
3. real-world data is inherently likely to have a distribution of
disorder that has an unboundedly long tail
The processing
(usually) looks something like
unordered stream -> buffer (per key) -> ordered stream -> stateful
logic (DoFn)
This is a pre-watermark, pre-Beam approach to processing data. It
drops more data and/or introduces more latency, but can lead to
simpler or more efficient operator implementations (but not always).
I do think it seems OK to add this to Beam in some form as a
convenience when the user knows something about their data and/or
DoFn. The risk of course is that users go for this when they
shouldn't, thinking it is simpler without considering the complexity
of how to avoid dropping too much data.
This thread seems to be a continuation of the other thread I just
responded to. It would be good to try to keep them tied together to
avoid duplicate responses.
Kenn
This is perfectly fine and can be solved by current tools Beam offers
(state & timers), but *only for streaming case*. The batch case is
essentially broken, because:
a) out-of-orderness is essentially *unbounded* (as opposed to input
being bounded, strangely, that is not a contradiction),
out-of-orderness
in streaming case is *bounded*, because the watermark can fall behind
only limit amount of time (sooner or later, nobody would actually
care
about results from streaming pipeline being months or years late,
right?)
b) with unbounded out-of-orderness, the spatial requirements of
state
grow with O(N), worst case, where N is size of the whole input
c) moreover, many runners restrict the size of state per key to
fit in
memory (spark, flink)
Now, solutions to this problems seem to be:
1) refine the model guarantees for batch stateful processing, so
that
we limit the out-of-orderness (the source of issues here) - the only
reasonable way to do that is to enforce sorting before all stateful
dofns in batch case (perhaps there might opt-out for that), or
2) define a way to mark stateful dofn as requiring the sorting
(e.g.
@RequiresTimeSortedInput) - note this has to be done for both
batch and
streaming case, as opposed to 1), or
3) define a different URN for "ordered stateful dofn", with default
expansion using state as buffer (for both batch and streaming case) -
that way this can be overridden in batch runners that can get into
trouble otherwise (and could be regarded as sort of natural
extension of
the current approach).
I still think that the best solution is 1), for multiple reasons
going
from being internally logically consistent to being practical and
easily
implemented (a few lines of code in flink's case for instance). On
the
other hand, if this is really not what we want to do, then I'd
like to
know the community's opinion on the two other options (or, if there
maybe is some other option I didn't cover).
Many thanks for opinions and help with fixing what is (sort of)
broken
right now.
Jan