On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
> I will not try to formalize this notion in this email. But
I will note that since it is universally assured, it would be
zero cost and significantly safer to formalize it and add an
annotation noting it was required. It has nothing to do with
event time ordering, only trigger firing ordering.
I cannot agree with the last sentence (and I'm really not
doing this on purpose :-)). Panes generally arrive out of
order, as mentioned several times in the discussions linked
from this thread. If we want to ensure "trigger firing
ordering", we can use the pane index, that is correct. But -
that is actually equivalent to sorting by event time, because
pane index order will be (nearly) the same as event time
order. This is due to the fact, that pane index and event
time correlate (both are monotonic).
Trigger firings can have decreasing event timestamps w/ the
minimum timestamp combiner*. I do think the issue at hand is best
analyzed in terms of the explicit ordering on panes. And I do
think we need to have an explicit guarantee or annotation strong
enough to describe a correct-under-all-allowed runners sink.
Today an antagonistic runner could probably break a lot of things.
Kenn
*In fact, they can decrease via the "maximum" timestamp combiner
because actually timestamp combiners only apply to the elements
that particular pane. This is weird, and maybe a design bug, but
good to know about.
The pane index "only" solves the issue of preserving ordering
even in case where there are multiple firings within the same
timestamp (regardless of granularity). This was mentioned in
the initial discussion about event time ordering, and is part
of the design doc - users should be allowed to provide UDF
for extracting time-correlated ordering field (which means
ability to choose a preferred, or authoritative, observer
which assigns unambiguous ordering to events). Example of
this might include Kafka offsets as well, or any queue index
for that matter. This is not yet implemented, but could
(should) be in the future.
The only case where these two things are (somewhat) different
is the case mentioned by @Steve - if the output is stateless
ParDo, which will get fused. But that is only because the
processing is single-threaded per key, and therefore the
ordering is implied by timer ordering (and careful here, many
runners don't have this ordering 100% correct, as of now -
this problem luckily appears only when there are multiple
timers per key). Moreover, if there should be a failure, then
the output might (would) get back in time anyway. If there
would be a shuffle operation after GBK/Combine, then the
ordering is no longer guaranteed and must be explicitly taken
care of.
Last note, I must agree with @Rui that all these discussions
are very much related to retractions (precisely the ability
to implement them).
Jan
On 11/26/19 7:34 AM, Kenneth Knowles wrote:
Hi Aaron,
Another insightful observation.
Whenever an aggregation (GBK / Combine per key) has a
trigger firing, there is a per-key sequence number attached.
It is included in metadata known as "PaneInfo" [1]. The
value of PaneInfo.getIndex() is colloquially referred to as
the "pane index". You can also make use of the "on time
index" if you like. The best way to access this metadata is
to add a parameter of type PaneInfo to your
DoFn's @ProcessElement method. This works for stateful or
stateless DoFn.
Most of Beam's IO connectors do not explicitly enforce that
outputs occur in pane index order but instead rely on the
hope that the runner delivers panes in order to the sink.
IMO this is dangerous but it has not yet caused a known
issue. In practice, each "input key to output key 'path' "
through a pipeline's logic does preserve order for all
existing runners AFAIK and it is the formalization that is
missing. It is related to an observation by +Rui Wang
<mailto:[email protected]> that processing retractions
requires the same key-to-key ordering.
I will not try to formalize this notion in this email. But I
will note that since it is universally assured, it would be
zero cost and significantly safer to formalize it and add an
annotation noting it was required. It has nothing to do with
event time ordering, only trigger firing ordering.
Kenn
[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada
<[email protected] <mailto:[email protected]>> wrote:
The blog posts on stateful and timely computation with
Beam should help clarify a lot about how to use state
and timers to do this:
https://beam.apache.org/blog/2017/02/13/stateful-processing.html
https://beam.apache.org/blog/2017/08/28/timely-processing.html
You'll see there how there's an implicit
per-single-element grouping for each key, so state and
timers should support your use case very well.
Best
-P.
On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz
<[email protected] <mailto:[email protected]>> wrote:
If you have a pipeline that looks like Input ->
GroupByKey -> ParDo, while it is not guaranteed, in
practice the sink will observe the trigger firings
in order (per key), since it'll be fused to the
output of the GBK operation (in all runners I know of).
There have been a couple threads about trigger
ordering as well on the list recently that might
have more information:
https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon
<[email protected] <mailto:[email protected]>> wrote:
@Jan @Pablo Thank you
@Pablo In this case it's a single global
windowed Combine/perKey, triggered per element.
Keys are few (client accounts) so they can live
forever.
It looks like just by virtue of using a stateful
ParDo I could get this final execution to be
"serialized" per key. (Then I could simply do
the compare-and-swap using Beam's state
mechanism to keep track of the "latest trigger
timestamp" instead of having to orchestrate
compare-and-swap in the target store :thinking:.)
On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
One addition, to make the list of options
exhaustive, there is probably
one more option
c) create a ParDo keyed by primary key of
your sink, cache the last
write in there and compare it locally,
without the need to query the
database
It would still need some timer to clear
values after watermark + allowed
lateness, because otherwise you would have
to cache your whole database
on workers. But because you don't need
actual ordering, you just need
the most recent value (if I got it right)
this might be an option.
Jan
On 11/25/19 10:53 PM, Jan Lukavský wrote:
> Hi Aaron,
>
> maybe someone else will give another
option, but if I understand
> correctly what you want to solve, then you
essentially have to do either:
>
> a) use the compare & swap mechanism in
the sink you described
>
> b) use a buffer to buffer elements inside
the outputting ParDo and
> only output them when watermark passes
(using a timer).
>
> There is actually an ongoing discussion
about how to make option b)
> user-friendly and part of Beam itself, but
currently there is no
> out-of-the-box solution for that.
>
> Jan
>
> On 11/25/19 10:27 PM, Aaron Dixon wrote:
>> Suppose I trigger a Combine per-element
(in a high-volume stream) and
>> use a ParDo as a sink.
>>
>> I assume there is no guarantee about the
order that my ParDo will see
>> these triggers, especially as it
processes in parallel, anyway.
>>
>> That said, my sink writes to a db or
cache and I would not like the
>> cache to ever regress its value to
something "before" what it has
>> already written.
>>
>> Is the best way to solve this problem to
always write the event-time
>> in the cache and do a compare-and-swap
only updating the sink if the
>> triggered value in-hand is later than the
target value?
>>
>> Or is there a better way to guarantee
that my ParDo sink will process
>> elements in-order? (Eg, if I can give up
per-event/real-time, then a
>> delay-based trigger would probably be
sufficient I imagine.)
>>
>> Thanks for advice!