Jan - let's try to defrag the threads on your time sorting proposal. This
thread may have useful ideas but I want to focus on helping Aaron in this
thread. You can link to this thread from other threads or from a design
doc. Does this seem OK to you?

Aaron - do you have the information you need to implement your sink? My
impression is that you have quite a good grasp of the issues even before
you asked.

Kenn

On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský <je...@seznam.cz> wrote:

> > Trigger firings can have decreasing event timestamps w/ the minimum
> timestamp combiner*. I do think the issue at hand is best analyzed in terms
> of the explicit ordering on panes. And I do think we need to have an
> explicit guarantee or annotation strong enough to describe a
> correct-under-all-allowed runners sink. Today an antagonistic runner could
> probably break a lot of things.
>
> Thanks for this insight. I didn't know about the relation between trigger
> firing (event) time - which is always non-decreasing - and the resulting
> timestamp of output pane - which can be affected by timestamp combiner and
> decrease in cases you describe. What actually correlates with the pane
> index at all times is processing time of trigger firings with the pane
> index. Would you say, that if the "annotation that would guarantee ordering
> of panes" could be viewed as a time ordering annotation with an additional
> time domain (event time, processing time)? Could then these two be viewed
> as a single one with some distinguishing parameter?
>
> @RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)
>
> ?
>
> Event time should be probably made the default, because that is
> information that is accessible with every WindowedValue, while pane index
> is available only after GBK (or generally might be available after every
> keyed sequential operation, but is missing after source for instance).
>
> Jan
> On 11/27/19 1:32 AM, Kenneth Knowles wrote:
>
>
>
> On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> > I will not try to formalize this notion in this email. But I will note
>> that since it is universally assured, it would be zero cost and
>> significantly safer to formalize it and add an annotation noting it was
>> required. It has nothing to do with event time ordering, only trigger
>> firing ordering.
>>
>> I cannot agree with the last sentence (and I'm really not doing this on
>> purpose :-)). Panes generally arrive out of order, as mentioned several
>> times in the discussions linked from this thread. If we want to ensure
>> "trigger firing ordering", we can use the pane index, that is correct. But
>> - that is actually equivalent to sorting by event time, because pane index
>> order will be (nearly) the same as event time order. This is due to the
>> fact, that pane index and event time correlate (both are monotonic).
>>
> Trigger firings can have decreasing event timestamps w/ the minimum
> timestamp combiner*. I do think the issue at hand is best analyzed in terms
> of the explicit ordering on panes. And I do think we need to have an
> explicit guarantee or annotation strong enough to describe a
> correct-under-all-allowed runners sink. Today an antagonistic runner could
> probably break a lot of things.
>
> Kenn
>
> *In fact, they can decrease via the "maximum" timestamp combiner because
> actually timestamp combiners only apply to the elements that particular
> pane. This is weird, and maybe a design bug, but good to know about.
>
>
>> The pane index "only" solves the issue of preserving ordering even in
>> case where there are multiple firings within the same timestamp (regardless
>> of granularity). This was mentioned in the initial discussion about event
>> time ordering, and is part of the design doc - users should be allowed to
>> provide UDF for extracting time-correlated ordering field (which means
>> ability to choose a preferred, or authoritative, observer which assigns
>> unambiguous ordering to events). Example of this might include Kafka
>> offsets as well, or any queue index for that matter. This is not yet
>> implemented, but could (should) be in the future.
>>
>> The only case where these two things are (somewhat) different is the case
>> mentioned by @Steve - if the output is stateless ParDo, which will get
>> fused. But that is only because the processing is single-threaded per key,
>> and therefore the ordering is implied by timer ordering (and careful here,
>> many runners don't have this ordering 100% correct, as of now - this
>> problem luckily appears only when there are multiple timers per key).
>> Moreover, if there should be a failure, then the output might (would) get
>> back in time anyway. If there would be a shuffle operation after
>> GBK/Combine, then the ordering is no longer guaranteed and must be
>> explicitly taken care of.
>>
>> Last note, I must agree with @Rui that all these discussions are very
>> much related to retractions (precisely the ability to implement them).
>>
>> Jan
>> On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>>
>> Hi Aaron,
>>
>> Another insightful observation.
>>
>> Whenever an aggregation (GBK / Combine per key) has a trigger firing,
>> there is a per-key sequence number attached. It is included in metadata
>> known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially
>> referred to as the "pane index". You can also make use of the "on time
>> index" if you like. The best way to access this metadata is to add a
>> parameter of type PaneInfo to your DoFn's @ProcessElement method. This
>> works for stateful or stateless DoFn.
>>
>> Most of Beam's IO connectors do not explicitly enforce that outputs occur
>> in pane index order but instead rely on the hope that the runner delivers
>> panes in order to the sink. IMO this is dangerous but it has not yet caused
>> a known issue. In practice, each "input key to output key 'path' " through
>> a pipeline's logic does preserve order for all existing runners AFAIK and
>> it is the formalization that is missing. It is related to an observation by 
>> +Rui
>> Wang <ruw...@google.com> that processing retractions requires the same
>> key-to-key ordering.
>>
>> I will not try to formalize this notion in this email. But I will note
>> that since it is universally assured, it would be zero cost and
>> significantly safer to formalize it and add an annotation noting it was
>> required. It has nothing to do with event time ordering, only trigger
>> firing ordering.
>>
>> Kenn
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>>
>>
>> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pabl...@google.com> wrote:
>>
>>> The blog posts on stateful and timely computation with Beam should help
>>> clarify a lot about how to use state and timers to do this:
>>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>
>>> You'll see there how there's an implicit per-single-element grouping for
>>> each key, so state and timers should support your use case very well.
>>>
>>> Best
>>> -P.
>>>
>>> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sniem...@apache.org>
>>> wrote:
>>>
>>>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo,
>>>> while it is not guaranteed, in practice the sink will observe the trigger
>>>> firings in order (per key), since it'll be fused to the output of the GBK
>>>> operation (in all runners I know of).
>>>>
>>>> There have been a couple threads about trigger ordering as well on the
>>>> list recently that might have more information:
>>>>
>>>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>>>
>>>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>>>
>>>>
>>>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <atdi...@gmail.com> wrote:
>>>>
>>>>> @Jan @Pablo Thank you
>>>>>
>>>>> @Pablo In this case it's a single global windowed Combine/perKey,
>>>>> triggered per element. Keys are few (client accounts) so they can live
>>>>> forever.
>>>>>
>>>>> It looks like just by virtue of using a stateful ParDo I could get
>>>>> this final execution to be "serialized" per key. (Then I could simply do
>>>>> the compare-and-swap using Beam's state mechanism to keep track of the
>>>>> "latest trigger timestamp" instead of having to orchestrate
>>>>> compare-and-swap in the target store :thinking:.)
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> One addition, to make the list of options exhaustive, there is
>>>>>> probably
>>>>>> one more option
>>>>>>
>>>>>>   c) create a ParDo keyed by primary key of your sink, cache the last
>>>>>> write in there and compare it locally, without the need to query the
>>>>>> database
>>>>>>
>>>>>> It would still need some timer to clear values after watermark +
>>>>>> allowed
>>>>>> lateness, because otherwise you would have to cache your whole
>>>>>> database
>>>>>> on workers. But because you don't need actual ordering, you just need
>>>>>> the most recent value (if I got it right) this might be an option.
>>>>>>
>>>>>> Jan
>>>>>>
>>>>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>>>>> > Hi Aaron,
>>>>>> >
>>>>>> > maybe someone else will give another option, but if I understand
>>>>>> > correctly what you want to solve, then you essentially have to do
>>>>>> either:
>>>>>> >
>>>>>> >  a) use the compare & swap mechanism in the sink you described
>>>>>> >
>>>>>> >  b) use a buffer to buffer elements inside the outputting ParDo and
>>>>>> > only output them when watermark passes (using a timer).
>>>>>> >
>>>>>> > There is actually an ongoing discussion about how to make option b)
>>>>>> > user-friendly and part of Beam itself, but currently there is no
>>>>>> > out-of-the-box solution for that.
>>>>>> >
>>>>>> > Jan
>>>>>> >
>>>>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>>>>> >> Suppose I trigger a Combine per-element (in a high-volume stream)
>>>>>> and
>>>>>> >> use a ParDo as a sink.
>>>>>> >>
>>>>>> >> I assume there is no guarantee about the order that my ParDo will
>>>>>> see
>>>>>> >> these triggers, especially as it processes in parallel, anyway.
>>>>>> >>
>>>>>> >> That said, my sink writes to a db or cache and I would not like
>>>>>> the
>>>>>> >> cache to ever regress its value to something "before" what it has
>>>>>> >> already written.
>>>>>> >>
>>>>>> >> Is the best way to solve this problem to always write the
>>>>>> event-time
>>>>>> >> in the cache and do a compare-and-swap only updating the sink if
>>>>>> the
>>>>>> >> triggered value in-hand is later than the target value?
>>>>>> >>
>>>>>> >> Or is there a better way to guarantee that my ParDo sink will
>>>>>> process
>>>>>> >> elements in-order? (Eg, if I can give up per-event/real-time, then
>>>>>> a
>>>>>> >> delay-based trigger would probably be sufficient I imagine.)
>>>>>> >>
>>>>>> >> Thanks for advice!
>>>>>>
>>>>>

Reply via email to