Hi Aaron,

Another insightful observation.

Whenever an aggregation (GBK / Combine per key) has a trigger firing, there
is a per-key sequence number attached. It is included in metadata known as
"PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially referred
to as the "pane index". You can also make use of the "on time index" if you
like. The best way to access this metadata is to add a parameter of type
PaneInfo to your DoFn's @ProcessElement method. This works for stateful or
stateless DoFn.

Most of Beam's IO connectors do not explicitly enforce that outputs occur
in pane index order but instead rely on the hope that the runner delivers
panes in order to the sink. IMO this is dangerous but it has not yet caused
a known issue. In practice, each "input key to output key 'path' " through
a pipeline's logic does preserve order for all existing runners AFAIK and
it is the formalization that is missing. It is related to an
observation by +Rui
Wang <[email protected]> that processing retractions requires the same
key-to-key ordering.

I will not try to formalize this notion in this email. But I will note that
since it is universally assured, it would be zero cost and significantly
safer to formalize it and add an annotation noting it was required. It has
nothing to do with event time ordering, only trigger firing ordering.

Kenn

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557


On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <[email protected]> wrote:

> The blog posts on stateful and timely computation with Beam should help
> clarify a lot about how to use state and timers to do this:
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> You'll see there how there's an implicit per-single-element grouping for
> each key, so state and timers should support your use case very well.
>
> Best
> -P.
>
> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <[email protected]> wrote:
>
>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo,
>> while it is not guaranteed, in practice the sink will observe the trigger
>> firings in order (per key), since it'll be fused to the output of the GBK
>> operation (in all runners I know of).
>>
>> There have been a couple threads about trigger ordering as well on the
>> list recently that might have more information:
>>
>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>
>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>
>>
>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <[email protected]> wrote:
>>
>>> @Jan @Pablo Thank you
>>>
>>> @Pablo In this case it's a single global windowed Combine/perKey,
>>> triggered per element. Keys are few (client accounts) so they can live
>>> forever.
>>>
>>> It looks like just by virtue of using a stateful ParDo I could get this
>>> final execution to be "serialized" per key. (Then I could simply do the
>>> compare-and-swap using Beam's state mechanism to keep track of the "latest
>>> trigger timestamp" instead of having to orchestrate compare-and-swap in the
>>> target store :thinking:.)
>>>
>>>
>>>
>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <[email protected]> wrote:
>>>
>>>> One addition, to make the list of options exhaustive, there is probably
>>>> one more option
>>>>
>>>>   c) create a ParDo keyed by primary key of your sink, cache the last
>>>> write in there and compare it locally, without the need to query the
>>>> database
>>>>
>>>> It would still need some timer to clear values after watermark +
>>>> allowed
>>>> lateness, because otherwise you would have to cache your whole database
>>>> on workers. But because you don't need actual ordering, you just need
>>>> the most recent value (if I got it right) this might be an option.
>>>>
>>>> Jan
>>>>
>>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>>> > Hi Aaron,
>>>> >
>>>> > maybe someone else will give another option, but if I understand
>>>> > correctly what you want to solve, then you essentially have to do
>>>> either:
>>>> >
>>>> >  a) use the compare & swap mechanism in the sink you described
>>>> >
>>>> >  b) use a buffer to buffer elements inside the outputting ParDo and
>>>> > only output them when watermark passes (using a timer).
>>>> >
>>>> > There is actually an ongoing discussion about how to make option b)
>>>> > user-friendly and part of Beam itself, but currently there is no
>>>> > out-of-the-box solution for that.
>>>> >
>>>> > Jan
>>>> >
>>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>>> >> Suppose I trigger a Combine per-element (in a high-volume stream)
>>>> and
>>>> >> use a ParDo as a sink.
>>>> >>
>>>> >> I assume there is no guarantee about the order that my ParDo will
>>>> see
>>>> >> these triggers, especially as it processes in parallel, anyway.
>>>> >>
>>>> >> That said, my sink writes to a db or cache and I would not like the
>>>> >> cache to ever regress its value to something "before" what it has
>>>> >> already written.
>>>> >>
>>>> >> Is the best way to solve this problem to always write the event-time
>>>> >> in the cache and do a compare-and-swap only updating the sink if the
>>>> >> triggered value in-hand is later than the target value?
>>>> >>
>>>> >> Or is there a better way to guarantee that my ParDo sink will
>>>> process
>>>> >> elements in-order? (Eg, if I can give up per-event/real-time, then a
>>>> >> delay-based trigger would probably be sufficient I imagine.)
>>>> >>
>>>> >> Thanks for advice!
>>>>
>>>

Reply via email to