Jan - let's try to defrag the threads on your time sorting proposal. This thread may have useful ideas but I want to focus on helping Aaron in this thread. You can link to this thread from other threads or from a design doc. Does this seem OK to you?
Aaron - do you have the information you need to implement your sink? My impression is that you have quite a good grasp of the issues even before you asked. Kenn On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský <[email protected]> wrote: > > Trigger firings can have decreasing event timestamps w/ the minimum > timestamp combiner*. I do think the issue at hand is best analyzed in terms > of the explicit ordering on panes. And I do think we need to have an > explicit guarantee or annotation strong enough to describe a > correct-under-all-allowed runners sink. Today an antagonistic runner could > probably break a lot of things. > > Thanks for this insight. I didn't know about the relation between trigger > firing (event) time - which is always non-decreasing - and the resulting > timestamp of output pane - which can be affected by timestamp combiner and > decrease in cases you describe. What actually correlates with the pane > index at all times is processing time of trigger firings with the pane > index. Would you say, that if the "annotation that would guarantee ordering > of panes" could be viewed as a time ordering annotation with an additional > time domain (event time, processing time)? Could then these two be viewed > as a single one with some distinguishing parameter? > > @RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME) > > ? > > Event time should be probably made the default, because that is > information that is accessible with every WindowedValue, while pane index > is available only after GBK (or generally might be available after every > keyed sequential operation, but is missing after source for instance). > > Jan > On 11/27/19 1:32 AM, Kenneth Knowles wrote: > > > > On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <[email protected]> wrote: > >> > I will not try to formalize this notion in this email. But I will note >> that since it is universally assured, it would be zero cost and >> significantly safer to formalize it and add an annotation noting it was >> required. It has nothing to do with event time ordering, only trigger >> firing ordering. >> >> I cannot agree with the last sentence (and I'm really not doing this on >> purpose :-)). Panes generally arrive out of order, as mentioned several >> times in the discussions linked from this thread. If we want to ensure >> "trigger firing ordering", we can use the pane index, that is correct. But >> - that is actually equivalent to sorting by event time, because pane index >> order will be (nearly) the same as event time order. This is due to the >> fact, that pane index and event time correlate (both are monotonic). >> > Trigger firings can have decreasing event timestamps w/ the minimum > timestamp combiner*. I do think the issue at hand is best analyzed in terms > of the explicit ordering on panes. And I do think we need to have an > explicit guarantee or annotation strong enough to describe a > correct-under-all-allowed runners sink. Today an antagonistic runner could > probably break a lot of things. > > Kenn > > *In fact, they can decrease via the "maximum" timestamp combiner because > actually timestamp combiners only apply to the elements that particular > pane. This is weird, and maybe a design bug, but good to know about. > > >> The pane index "only" solves the issue of preserving ordering even in >> case where there are multiple firings within the same timestamp (regardless >> of granularity). This was mentioned in the initial discussion about event >> time ordering, and is part of the design doc - users should be allowed to >> provide UDF for extracting time-correlated ordering field (which means >> ability to choose a preferred, or authoritative, observer which assigns >> unambiguous ordering to events). Example of this might include Kafka >> offsets as well, or any queue index for that matter. This is not yet >> implemented, but could (should) be in the future. >> >> The only case where these two things are (somewhat) different is the case >> mentioned by @Steve - if the output is stateless ParDo, which will get >> fused. But that is only because the processing is single-threaded per key, >> and therefore the ordering is implied by timer ordering (and careful here, >> many runners don't have this ordering 100% correct, as of now - this >> problem luckily appears only when there are multiple timers per key). >> Moreover, if there should be a failure, then the output might (would) get >> back in time anyway. If there would be a shuffle operation after >> GBK/Combine, then the ordering is no longer guaranteed and must be >> explicitly taken care of. >> >> Last note, I must agree with @Rui that all these discussions are very >> much related to retractions (precisely the ability to implement them). >> >> Jan >> On 11/26/19 7:34 AM, Kenneth Knowles wrote: >> >> Hi Aaron, >> >> Another insightful observation. >> >> Whenever an aggregation (GBK / Combine per key) has a trigger firing, >> there is a per-key sequence number attached. It is included in metadata >> known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially >> referred to as the "pane index". You can also make use of the "on time >> index" if you like. The best way to access this metadata is to add a >> parameter of type PaneInfo to your DoFn's @ProcessElement method. This >> works for stateful or stateless DoFn. >> >> Most of Beam's IO connectors do not explicitly enforce that outputs occur >> in pane index order but instead rely on the hope that the runner delivers >> panes in order to the sink. IMO this is dangerous but it has not yet caused >> a known issue. In practice, each "input key to output key 'path' " through >> a pipeline's logic does preserve order for all existing runners AFAIK and >> it is the formalization that is missing. It is related to an observation by >> +Rui >> Wang <[email protected]> that processing retractions requires the same >> key-to-key ordering. >> >> I will not try to formalize this notion in this email. But I will note >> that since it is universally assured, it would be zero cost and >> significantly safer to formalize it and add an annotation noting it was >> required. It has nothing to do with event time ordering, only trigger >> firing ordering. >> >> Kenn >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java >> [2] >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557 >> >> >> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <[email protected]> wrote: >> >>> The blog posts on stateful and timely computation with Beam should help >>> clarify a lot about how to use state and timers to do this: >>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html >>> https://beam.apache.org/blog/2017/08/28/timely-processing.html >>> >>> You'll see there how there's an implicit per-single-element grouping for >>> each key, so state and timers should support your use case very well. >>> >>> Best >>> -P. >>> >>> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <[email protected]> >>> wrote: >>> >>>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo, >>>> while it is not guaranteed, in practice the sink will observe the trigger >>>> firings in order (per key), since it'll be fused to the output of the GBK >>>> operation (in all runners I know of). >>>> >>>> There have been a couple threads about trigger ordering as well on the >>>> list recently that might have more information: >>>> >>>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E >>>> >>>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E >>>> >>>> >>>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <[email protected]> wrote: >>>> >>>>> @Jan @Pablo Thank you >>>>> >>>>> @Pablo In this case it's a single global windowed Combine/perKey, >>>>> triggered per element. Keys are few (client accounts) so they can live >>>>> forever. >>>>> >>>>> It looks like just by virtue of using a stateful ParDo I could get >>>>> this final execution to be "serialized" per key. (Then I could simply do >>>>> the compare-and-swap using Beam's state mechanism to keep track of the >>>>> "latest trigger timestamp" instead of having to orchestrate >>>>> compare-and-swap in the target store :thinking:.) >>>>> >>>>> >>>>> >>>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <[email protected]> wrote: >>>>> >>>>>> One addition, to make the list of options exhaustive, there is >>>>>> probably >>>>>> one more option >>>>>> >>>>>> c) create a ParDo keyed by primary key of your sink, cache the last >>>>>> write in there and compare it locally, without the need to query the >>>>>> database >>>>>> >>>>>> It would still need some timer to clear values after watermark + >>>>>> allowed >>>>>> lateness, because otherwise you would have to cache your whole >>>>>> database >>>>>> on workers. But because you don't need actual ordering, you just need >>>>>> the most recent value (if I got it right) this might be an option. >>>>>> >>>>>> Jan >>>>>> >>>>>> On 11/25/19 10:53 PM, Jan Lukavský wrote: >>>>>> > Hi Aaron, >>>>>> > >>>>>> > maybe someone else will give another option, but if I understand >>>>>> > correctly what you want to solve, then you essentially have to do >>>>>> either: >>>>>> > >>>>>> > a) use the compare & swap mechanism in the sink you described >>>>>> > >>>>>> > b) use a buffer to buffer elements inside the outputting ParDo and >>>>>> > only output them when watermark passes (using a timer). >>>>>> > >>>>>> > There is actually an ongoing discussion about how to make option b) >>>>>> > user-friendly and part of Beam itself, but currently there is no >>>>>> > out-of-the-box solution for that. >>>>>> > >>>>>> > Jan >>>>>> > >>>>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote: >>>>>> >> Suppose I trigger a Combine per-element (in a high-volume stream) >>>>>> and >>>>>> >> use a ParDo as a sink. >>>>>> >> >>>>>> >> I assume there is no guarantee about the order that my ParDo will >>>>>> see >>>>>> >> these triggers, especially as it processes in parallel, anyway. >>>>>> >> >>>>>> >> That said, my sink writes to a db or cache and I would not like >>>>>> the >>>>>> >> cache to ever regress its value to something "before" what it has >>>>>> >> already written. >>>>>> >> >>>>>> >> Is the best way to solve this problem to always write the >>>>>> event-time >>>>>> >> in the cache and do a compare-and-swap only updating the sink if >>>>>> the >>>>>> >> triggered value in-hand is later than the target value? >>>>>> >> >>>>>> >> Or is there a better way to guarantee that my ParDo sink will >>>>>> process >>>>>> >> elements in-order? (Eg, if I can give up per-event/real-time, then >>>>>> a >>>>>> >> delay-based trigger would probably be sufficient I imagine.) >>>>>> >> >>>>>> >> Thanks for advice! >>>>>> >>>>>
