On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
> I will not try to formalize this notion in this email. But I
will note that since it is universally assured, it would be zero
cost and significantly safer to formalize it and add an annotation
noting it was required. It has nothing to do with event time
ordering, only trigger firing ordering.
I cannot agree with the last sentence (and I'm really not doing
this on purpose :-)). Panes generally arrive out of order, as
mentioned several times in the discussions linked from this
thread. If we want to ensure "trigger firing ordering", we can use
the pane index, that is correct. But - that is actually equivalent
to sorting by event time, because pane index order will be
(nearly) the same as event time order. This is due to the fact,
that pane index and event time correlate (both are monotonic).
Trigger firings can have decreasing event timestamps w/ the minimum
timestamp combiner*. I do think the issue at hand is best analyzed in
terms of the explicit ordering on panes. And I do think we need to
have an explicit guarantee or annotation strong enough to describe a
correct-under-all-allowed runners sink. Today an antagonistic runner
could probably break a lot of things.
Kenn
*In fact, they can decrease via the "maximum" timestamp combiner
because actually timestamp combiners only apply to the elements that
particular pane. This is weird, and maybe a design bug, but good to
know about.
The pane index "only" solves the issue of preserving ordering even
in case where there are multiple firings within the same timestamp
(regardless of granularity). This was mentioned in the initial
discussion about event time ordering, and is part of the design
doc - users should be allowed to provide UDF for extracting
time-correlated ordering field (which means ability to choose a
preferred, or authoritative, observer which assigns unambiguous
ordering to events). Example of this might include Kafka offsets
as well, or any queue index for that matter. This is not yet
implemented, but could (should) be in the future.
The only case where these two things are (somewhat) different is
the case mentioned by @Steve - if the output is stateless ParDo,
which will get fused. But that is only because the processing is
single-threaded per key, and therefore the ordering is implied by
timer ordering (and careful here, many runners don't have this
ordering 100% correct, as of now - this problem luckily appears
only when there are multiple timers per key). Moreover, if there
should be a failure, then the output might (would) get back in
time anyway. If there would be a shuffle operation after
GBK/Combine, then the ordering is no longer guaranteed and must be
explicitly taken care of.
Last note, I must agree with @Rui that all these discussions are
very much related to retractions (precisely the ability to
implement them).
Jan
On 11/26/19 7:34 AM, Kenneth Knowles wrote:
Hi Aaron,
Another insightful observation.
Whenever an aggregation (GBK / Combine per key) has a trigger
firing, there is a per-key sequence number attached. It is
included in metadata known as "PaneInfo" [1]. The value of
PaneInfo.getIndex() is colloquially referred to as the "pane
index". You can also make use of the "on time index" if you like.
The best way to access this metadata is to add a parameter of
type PaneInfo to your DoFn's @ProcessElement method. This works
for stateful or stateless DoFn.
Most of Beam's IO connectors do not explicitly enforce that
outputs occur in pane index order but instead rely on the hope
that the runner delivers panes in order to the sink. IMO this is
dangerous but it has not yet caused a known issue. In practice,
each "input key to output key 'path' " through a pipeline's logic
does preserve order for all existing runners AFAIK and it is the
formalization that is missing. It is related to an observation by
+Rui Wang <mailto:[email protected]> that processing retractions
requires the same key-to-key ordering.
I will not try to formalize this notion in this email. But I will
note that since it is universally assured, it would be zero cost
and significantly safer to formalize it and add an annotation
noting it was required. It has nothing to do with event time
ordering, only trigger firing ordering.
Kenn
[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <[email protected]
<mailto:[email protected]>> wrote:
The blog posts on stateful and timely computation with Beam
should help clarify a lot about how to use state and timers
to do this:
https://beam.apache.org/blog/2017/02/13/stateful-processing.html
https://beam.apache.org/blog/2017/08/28/timely-processing.html
You'll see there how there's an implicit per-single-element
grouping for each key, so state and timers should support
your use case very well.
Best
-P.
On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz
<[email protected] <mailto:[email protected]>> wrote:
If you have a pipeline that looks like Input ->
GroupByKey -> ParDo, while it is not guaranteed, in
practice the sink will observe the trigger firings in
order (per key), since it'll be fused to the output of
the GBK operation (in all runners I know of).
There have been a couple threads about trigger ordering
as well on the list recently that might have more
information:
https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon
<[email protected] <mailto:[email protected]>> wrote:
@Jan @Pablo Thank you
@Pablo In this case it's a single global windowed
Combine/perKey, triggered per element. Keys are few
(client accounts) so they can live forever.
It looks like just by virtue of using a stateful
ParDo I could get this final execution to be
"serialized" per key. (Then I could simply do the
compare-and-swap using Beam's state mechanism to keep
track of the "latest trigger timestamp" instead of
having to orchestrate compare-and-swap in the target
store :thinking:.)
On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
One addition, to make the list of options
exhaustive, there is probably
one more option
c) create a ParDo keyed by primary key of your
sink, cache the last
write in there and compare it locally, without
the need to query the
database
It would still need some timer to clear values
after watermark + allowed
lateness, because otherwise you would have to
cache your whole database
on workers. But because you don't need actual
ordering, you just need
the most recent value (if I got it right) this
might be an option.
Jan
On 11/25/19 10:53 PM, Jan Lukavský wrote:
> Hi Aaron,
>
> maybe someone else will give another option,
but if I understand
> correctly what you want to solve, then you
essentially have to do either:
>
> a) use the compare & swap mechanism in the
sink you described
>
> b) use a buffer to buffer elements inside the
outputting ParDo and
> only output them when watermark passes (using a
timer).
>
> There is actually an ongoing discussion about
how to make option b)
> user-friendly and part of Beam itself, but
currently there is no
> out-of-the-box solution for that.
>
> Jan
>
> On 11/25/19 10:27 PM, Aaron Dixon wrote:
>> Suppose I trigger a Combine per-element (in a
high-volume stream) and
>> use a ParDo as a sink.
>>
>> I assume there is no guarantee about the order
that my ParDo will see
>> these triggers, especially as it processes in
parallel, anyway.
>>
>> That said, my sink writes to a db or cache and
I would not like the
>> cache to ever regress its value to something
"before" what it has
>> already written.
>>
>> Is the best way to solve this problem to
always write the event-time
>> in the cache and do a compare-and-swap only
updating the sink if the
>> triggered value in-hand is later than the
target value?
>>
>> Or is there a better way to guarantee that my
ParDo sink will process
>> elements in-order? (Eg, if I can give up
per-event/real-time, then a
>> delay-based trigger would probably be
sufficient I imagine.)
>>
>> Thanks for advice!