> I will not try to formalize this notion in this email. But I will
note that since it is universally assured, it would be zero cost and
significantly safer to formalize it and add an annotation noting it was
required. It has nothing to do with event time ordering, only trigger
firing ordering.
I cannot agree with the last sentence (and I'm really not doing this on
purpose :-)). Panes generally arrive out of order, as mentioned several
times in the discussions linked from this thread. If we want to ensure
"trigger firing ordering", we can use the pane index, that is correct.
But - that is actually equivalent to sorting by event time, because pane
index order will be (nearly) the same as event time order. This is due
to the fact, that pane index and event time correlate (both are
monotonic). The pane index "only" solves the issue of preserving
ordering even in case where there are multiple firings within the same
timestamp (regardless of granularity). This was mentioned in the initial
discussion about event time ordering, and is part of the design doc -
users should be allowed to provide UDF for extracting time-correlated
ordering field (which means ability to choose a preferred, or
authoritative, observer which assigns unambiguous ordering to events).
Example of this might include Kafka offsets as well, or any queue index
for that matter. This is not yet implemented, but could (should) be in
the future.
The only case where these two things are (somewhat) different is the
case mentioned by @Steve - if the output is stateless ParDo, which will
get fused. But that is only because the processing is single-threaded
per key, and therefore the ordering is implied by timer ordering (and
careful here, many runners don't have this ordering 100% correct, as of
now - this problem luckily appears only when there are multiple timers
per key). Moreover, if there should be a failure, then the output might
(would) get back in time anyway. If there would be a shuffle operation
after GBK/Combine, then the ordering is no longer guaranteed and must be
explicitly taken care of.
Last note, I must agree with @Rui that all these discussions are very
much related to retractions (precisely the ability to implement them).
Jan
On 11/26/19 7:34 AM, Kenneth Knowles wrote:
Hi Aaron,
Another insightful observation.
Whenever an aggregation (GBK / Combine per key) has a trigger firing,
there is a per-key sequence number attached. It is included in
metadata known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is
colloquially referred to as the "pane index". You can also make use of
the "on time index" if you like. The best way to access this metadata
is to add a parameter of type PaneInfo to your DoFn's @ProcessElement
method. This works for stateful or stateless DoFn.
Most of Beam's IO connectors do not explicitly enforce that outputs
occur in pane index order but instead rely on the hope that the runner
delivers panes in order to the sink. IMO this is dangerous but it has
not yet caused a known issue. In practice, each "input key to output
key 'path' " through a pipeline's logic does preserve order for all
existing runners AFAIK and it is the formalization that is missing. It
is related to an observation by +Rui Wang
<mailto:ruw...@google.com> that processing retractions requires the
same key-to-key ordering.
I will not try to formalize this notion in this email. But I will note
that since it is universally assured, it would be zero cost and
significantly safer to formalize it and add an annotation noting it
was required. It has nothing to do with event time ordering, only
trigger firing ordering.
Kenn
[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pabl...@google.com
<mailto:pabl...@google.com>> wrote:
The blog posts on stateful and timely computation with Beam should
help clarify a lot about how to use state and timers to do this:
https://beam.apache.org/blog/2017/02/13/stateful-processing.html
https://beam.apache.org/blog/2017/08/28/timely-processing.html
You'll see there how there's an implicit per-single-element
grouping for each key, so state and timers should support your use
case very well.
Best
-P.
On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sniem...@apache.org
<mailto:sniem...@apache.org>> wrote:
If you have a pipeline that looks like Input -> GroupByKey ->
ParDo, while it is not guaranteed, in practice the sink will
observe the trigger firings in order (per key), since it'll be
fused to the output of the GBK operation (in all runners I
know of).
There have been a couple threads about trigger ordering as
well on the list recently that might have more information:
https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <atdi...@gmail.com
<mailto:atdi...@gmail.com>> wrote:
@Jan @Pablo Thank you
@Pablo In this case it's a single global windowed
Combine/perKey, triggered per element. Keys are few
(client accounts) so they can live forever.
It looks like just by virtue of using a stateful ParDo I
could get this final execution to be "serialized" per key.
(Then I could simply do the compare-and-swap using Beam's
state mechanism to keep track of the "latest trigger
timestamp" instead of having to orchestrate
compare-and-swap in the target store :thinking:.)
On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
One addition, to make the list of options exhaustive,
there is probably
one more option
c) create a ParDo keyed by primary key of your sink,
cache the last
write in there and compare it locally, without the
need to query the
database
It would still need some timer to clear values after
watermark + allowed
lateness, because otherwise you would have to cache
your whole database
on workers. But because you don't need actual
ordering, you just need
the most recent value (if I got it right) this might
be an option.
Jan
On 11/25/19 10:53 PM, Jan Lukavský wrote:
> Hi Aaron,
>
> maybe someone else will give another option, but if
I understand
> correctly what you want to solve, then you
essentially have to do either:
>
> a) use the compare & swap mechanism in the sink you
described
>
> b) use a buffer to buffer elements inside the
outputting ParDo and
> only output them when watermark passes (using a timer).
>
> There is actually an ongoing discussion about how to
make option b)
> user-friendly and part of Beam itself, but currently
there is no
> out-of-the-box solution for that.
>
> Jan
>
> On 11/25/19 10:27 PM, Aaron Dixon wrote:
>> Suppose I trigger a Combine per-element (in a
high-volume stream) and
>> use a ParDo as a sink.
>>
>> I assume there is no guarantee about the order that
my ParDo will see
>> these triggers, especially as it processes in
parallel, anyway.
>>
>> That said, my sink writes to a db or cache and I
would not like the
>> cache to ever regress its value to something
"before" what it has
>> already written.
>>
>> Is the best way to solve this problem to always
write the event-time
>> in the cache and do a compare-and-swap only
updating the sink if the
>> triggered value in-hand is later than the target value?
>>
>> Or is there a better way to guarantee that my ParDo
sink will process
>> elements in-order? (Eg, if I can give up
per-event/real-time, then a
>> delay-based trigger would probably be sufficient I
imagine.)
>>
>> Thanks for advice!