On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <[email protected]> wrote:

> > I will not try to formalize this notion in this email. But I will note
> that since it is universally assured, it would be zero cost and
> significantly safer to formalize it and add an annotation noting it was
> required. It has nothing to do with event time ordering, only trigger
> firing ordering.
>
> I cannot agree with the last sentence (and I'm really not doing this on
> purpose :-)). Panes generally arrive out of order, as mentioned several
> times in the discussions linked from this thread. If we want to ensure
> "trigger firing ordering", we can use the pane index, that is correct. But
> - that is actually equivalent to sorting by event time, because pane index
> order will be (nearly) the same as event time order. This is due to the
> fact, that pane index and event time correlate (both are monotonic).
>
Trigger firings can have decreasing event timestamps w/ the minimum
timestamp combiner*. I do think the issue at hand is best analyzed in terms
of the explicit ordering on panes. And I do think we need to have an
explicit guarantee or annotation strong enough to describe a
correct-under-all-allowed runners sink. Today an antagonistic runner could
probably break a lot of things.

Kenn

*In fact, they can decrease via the "maximum" timestamp combiner because
actually timestamp combiners only apply to the elements that particular
pane. This is weird, and maybe a design bug, but good to know about.


> The pane index "only" solves the issue of preserving ordering even in case
> where there are multiple firings within the same timestamp (regardless of
> granularity). This was mentioned in the initial discussion about event time
> ordering, and is part of the design doc - users should be allowed to
> provide UDF for extracting time-correlated ordering field (which means
> ability to choose a preferred, or authoritative, observer which assigns
> unambiguous ordering to events). Example of this might include Kafka
> offsets as well, or any queue index for that matter. This is not yet
> implemented, but could (should) be in the future.
>
> The only case where these two things are (somewhat) different is the case
> mentioned by @Steve - if the output is stateless ParDo, which will get
> fused. But that is only because the processing is single-threaded per key,
> and therefore the ordering is implied by timer ordering (and careful here,
> many runners don't have this ordering 100% correct, as of now - this
> problem luckily appears only when there are multiple timers per key).
> Moreover, if there should be a failure, then the output might (would) get
> back in time anyway. If there would be a shuffle operation after
> GBK/Combine, then the ordering is no longer guaranteed and must be
> explicitly taken care of.
>
> Last note, I must agree with @Rui that all these discussions are very much
> related to retractions (precisely the ability to implement them).
>
> Jan
> On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>
> Hi Aaron,
>
> Another insightful observation.
>
> Whenever an aggregation (GBK / Combine per key) has a trigger firing,
> there is a per-key sequence number attached. It is included in metadata
> known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially
> referred to as the "pane index". You can also make use of the "on time
> index" if you like. The best way to access this metadata is to add a
> parameter of type PaneInfo to your DoFn's @ProcessElement method. This
> works for stateful or stateless DoFn.
>
> Most of Beam's IO connectors do not explicitly enforce that outputs occur
> in pane index order but instead rely on the hope that the runner delivers
> panes in order to the sink. IMO this is dangerous but it has not yet caused
> a known issue. In practice, each "input key to output key 'path' " through
> a pipeline's logic does preserve order for all existing runners AFAIK and
> it is the formalization that is missing. It is related to an observation by 
> +Rui
> Wang <[email protected]> that processing retractions requires the same
> key-to-key ordering.
>
> I will not try to formalize this notion in this email. But I will note
> that since it is universally assured, it would be zero cost and
> significantly safer to formalize it and add an annotation noting it was
> required. It has nothing to do with event time ordering, only trigger
> firing ordering.
>
> Kenn
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>
>
> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <[email protected]> wrote:
>
>> The blog posts on stateful and timely computation with Beam should help
>> clarify a lot about how to use state and timers to do this:
>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> You'll see there how there's an implicit per-single-element grouping for
>> each key, so state and timers should support your use case very well.
>>
>> Best
>> -P.
>>
>> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <[email protected]>
>> wrote:
>>
>>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo,
>>> while it is not guaranteed, in practice the sink will observe the trigger
>>> firings in order (per key), since it'll be fused to the output of the GBK
>>> operation (in all runners I know of).
>>>
>>> There have been a couple threads about trigger ordering as well on the
>>> list recently that might have more information:
>>>
>>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>>
>>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>>
>>>
>>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <[email protected]> wrote:
>>>
>>>> @Jan @Pablo Thank you
>>>>
>>>> @Pablo In this case it's a single global windowed Combine/perKey,
>>>> triggered per element. Keys are few (client accounts) so they can live
>>>> forever.
>>>>
>>>> It looks like just by virtue of using a stateful ParDo I could get this
>>>> final execution to be "serialized" per key. (Then I could simply do the
>>>> compare-and-swap using Beam's state mechanism to keep track of the "latest
>>>> trigger timestamp" instead of having to orchestrate compare-and-swap in the
>>>> target store :thinking:.)
>>>>
>>>>
>>>>
>>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <[email protected]> wrote:
>>>>
>>>>> One addition, to make the list of options exhaustive, there is
>>>>> probably
>>>>> one more option
>>>>>
>>>>>   c) create a ParDo keyed by primary key of your sink, cache the last
>>>>> write in there and compare it locally, without the need to query the
>>>>> database
>>>>>
>>>>> It would still need some timer to clear values after watermark +
>>>>> allowed
>>>>> lateness, because otherwise you would have to cache your whole
>>>>> database
>>>>> on workers. But because you don't need actual ordering, you just need
>>>>> the most recent value (if I got it right) this might be an option.
>>>>>
>>>>> Jan
>>>>>
>>>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>>>> > Hi Aaron,
>>>>> >
>>>>> > maybe someone else will give another option, but if I understand
>>>>> > correctly what you want to solve, then you essentially have to do
>>>>> either:
>>>>> >
>>>>> >  a) use the compare & swap mechanism in the sink you described
>>>>> >
>>>>> >  b) use a buffer to buffer elements inside the outputting ParDo and
>>>>> > only output them when watermark passes (using a timer).
>>>>> >
>>>>> > There is actually an ongoing discussion about how to make option b)
>>>>> > user-friendly and part of Beam itself, but currently there is no
>>>>> > out-of-the-box solution for that.
>>>>> >
>>>>> > Jan
>>>>> >
>>>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>>>> >> Suppose I trigger a Combine per-element (in a high-volume stream)
>>>>> and
>>>>> >> use a ParDo as a sink.
>>>>> >>
>>>>> >> I assume there is no guarantee about the order that my ParDo will
>>>>> see
>>>>> >> these triggers, especially as it processes in parallel, anyway.
>>>>> >>
>>>>> >> That said, my sink writes to a db or cache and I would not like the
>>>>> >> cache to ever regress its value to something "before" what it has
>>>>> >> already written.
>>>>> >>
>>>>> >> Is the best way to solve this problem to always write the
>>>>> event-time
>>>>> >> in the cache and do a compare-and-swap only updating the sink if
>>>>> the
>>>>> >> triggered value in-hand is later than the target value?
>>>>> >>
>>>>> >> Or is there a better way to guarantee that my ParDo sink will
>>>>> process
>>>>> >> elements in-order? (Eg, if I can give up per-event/real-time, then
>>>>> a
>>>>> >> delay-based trigger would probably be sufficient I imagine.)
>>>>> >>
>>>>> >> Thanks for advice!
>>>>>
>>>>

Reply via email to