Groovy, thanks Lukasz, thanks Robert. Really appreciate your input on this.
Stephan On Fri, 8 Jun 2018, 01:15 Robert Bradshaw, <[email protected]> wrote: > On Thu, Jun 7, 2018 at 3:42 PM Stephan Kotze <[email protected]> > wrote: > >> Yep, I get that watermarks can move forward in Chunks greater than one. >> >> I am also comfortable with the notion of Aggregate[Pete:09:02,X,Y] >> before Aggregate[Pete:09:01,X,Y] and them arriving out of order at another >> Window with it's own triggers. >> >> I don't need the data ordered in event time (strictly speaking) I'm happy >> with them arriving in any order, But I only want the trigger to fire and >> release its elements once all of the aggs up to that point in time have >> become available. >> > > Timers are exactly this--they fire when you've seen all data up to a given > timestamp. Until we support infinite window sets, I think that stateful > DoFns are the simplest solution here. A CumulativeSum (or general > CumulativeCombinePerKey) PTransform would probably make a nice blog post. > > >> I did indeed consider the previous approach (using the feedback loop), >> and yep the problem is no different, I wanted to explore the space further >> and find a more elegant solution (Not introducing Cycles if there was a >> better way to handle it). >> >> >> >> >> >> On Thu, Jun 7, 2018 at 10:34 PM Lukasz Cwik <[email protected]> wrote: >> >>> A watermark is a lower bound on data that is processed and available. It >>> is specifically a lower bound because we want runners to be able to process >>> each window in parallel. >>> >>> In your example, a Runner may choose to compute >>> Aggregate[Pete:09:01,X,Y] in parallel with Aggregate[Pete:09:02,X,Y] even >>> if the watermark is only at 9:00 and then advance the watermark to 9:03. >>> This means that a downstream computation may see Aggregate[Pete:09:02,X,Y] >>> before Aggregate[Pete:09:01,X,Y]. The Runner may actually process 1000s of >>> windows at the same time in parallel and advance the watermark by an >>> arbitrarily large number. Nothing states that the watermark only needs to >>> advance one time unit at a time. >>> >>> Your problem is specifically saying I want you to provide me all the >>> data ordered in event time per key (a specialization of sorting). This >>> would require Runners to take this massively parallel computation and order >>> it per key which doesn't exist in Apache Beam. People have requested >>> support for ordering/sorting in the past and there is some limited support >>> inside the sorter extension[1] but nothing like what your looking for. The >>> only way to do this is to build this ordering yourself, if you can provide >>> a generalization I'm sure Apache Beam would be interested in a contribution >>> of that kind. >>> >>> On the other hand, do session windows fit your usecase or do you truly >>> need a global aggregation that is ongoing? >>> >>> Also, you had a very similar thread about doing Global Aggregation[2], >>> does using a feedback loop via Pubsub/Kafka help you solve your problem so >>> your not using a Global Window and can rely on Apache Beam's triggers to >>> handle the "global" aggregation? (and how is this problem different then >>> the last?) >>> >>> 1: >>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter >>> 2: >>> https://lists.apache.org/thread.html/d24e57f918804fe0e8bbd950cbda772eb612c978e753530914398fd8@%3Cuser.beam.apache.org%3E >>> >>> >>> >>> On Thu, Jun 7, 2018 at 2:48 AM Stephan Kotze <[email protected]> >>> wrote: >>> >>>> Thanks for the thorough replies! >>>> >>>> Raghu, I think I have mistakenly used the wrong terminology with >>>> regards to " PCollection has a notion of "which elements it has >>>> emitted for a given position of the watermark" So apologies there :) >>>> >>>> The essence here does seem to be in this though: >>>> >>>>> Thus the questions. >>>>> >>>>> If I have a ParDo on the GlobalWindow that is triggered by >>>>> OnElementCountAtLeast(1) and events can arrive out of order, how can the >>>>> ParDo have a watermark that moves only forward when it's possible To >>>>> trigger on any amount of elements having arrived (this would appear to >>>>> allow the emission of 2 later events, followed by an earlier event for >>>>> another trigger). >>>>> >>>> You have to re-implement effectively what Apache Beam Runners do by >>>> computing the watermark of the elements, figuring out if they are early, on >>>> time, late and buffering them so that they are grouped together correctly >>>> for each window with a StatefulDoFn. This is quite difficult to do in >>>> general. >>>> >>>> This for me feels like an unnecessarily complex thing one needs to >>>> implement, to ensure completeness/correctness even when you are sure to >>>> have all the relevant data in the pipeline for a given time period already. >>>> >>>> So if you'll humour me one more time please, I'll try to explain as >>>> concisely as possible, (Because it just feels wrong that use case requires >>>> added complexity of manually implemented stateful DoFns. >>>> >>>> >>>> - I have an unbounded source of events like these: >>>> Event[TimeStamp:Instant,PersonId:String,Amount:Long] >>>> - The source is watermarked on event time (Event.TimeStamp) >>>> - The messages have timestamps applied (beam timestamp = >>>> Event.TimeStamp >>>> - Events arrive in order of event time. >>>> - I create aggregates like these: >>>> Aggregate[TimeStamp:Instant,PersonId:String, EventCount:Long, >>>> AmountSum:long] >>>> >>>> >>>> I would like to obtain the following: >>>> >>>> >>>> -------------------------------------------------------------------------------------------------- >>>> | PersonId | Time | 1Min:EventCount | 1Min:AmountSum | >>>> Global:EventCount | GlobalAmountSum | >>>> >>>> -------------------------------------------------------------------------------------------------- >>>> | Pete | 09:01 | 3 | 7 | 3 >>>> | 7 | >>>> >>>> -------------------------------------------------------------------------------------------------- >>>> | Pete | 09:02 | 1 | 2 | 4 >>>> | 9 | >>>> >>>> -------------------------------------------------------------------------------------------------- >>>> | Pete | 09:03 | 5 | 9 | 9 >>>> | 18 | >>>> >>>> -------------------------------------------------------------------------------------------------- >>>> .... >>>> >>>> -------------------------------------------------------------------------------------------------- >>>> >>>> >>>> A rough starting point for a pipeline is: >>>> >>>> PCollection<KV<String, Aggregate>> per1MinAggStream = >>>> UboundedSource<Event> >>>> -> KV.Create(Event:PersonId) >>>> -> FixedWindow.of(1M) >>>> -> GroupByKey() >>>> -> AggregateDoFn( -> c.output(key, new >>>> Aggregate(window.getMaxTimeStamp(),key,count(),sum()))) >>>> >>>> PCollection<KV<KV<String, Aggregate>> allTimeStream = >>>> per1MinAggStream >>>> -> new GlobalWindow().trigger(??? OnElementCountAtleast(1) ????) >>>> -> GroupByKey() >>>> -> GlobalAggDoFn( -> c.output(key, new >>>> Aggregate(window.getMaxTimeStamp(),key,count(),sum()))) >>>> >>>> ///Potentially, re-assign allTimeStream to 1Min windows here >>>> >>>> results = KeyedPCollectionTuple >>>> .of(1MinAggsTag, per1MinAggStream) >>>> .and(globalAggsTag, allTimeStream) >>>> .apply(CoGroupByKey.create()) >>>> -> doFn() >>>> >>>> This works if I can find a trigger for the global window that ensures >>>> it does not emit any 1Min Aggs, with others still in flight (in processing >>>> not event time) to my GlobalAggDoFn >>>> The 1 min aggs may be out of order in the Iterable<Aggregate> provided >>>> to GlobalAggDoFn, (I can re-order and emit the totals with the correct >>>> timestamps then assign to new 1 min windows). >>>> However, if the GlobalWindow triggers and the Iterable<Aggregate> still >>>> has missing 1Min Aggs, the Global Aggregates cannot be guaranteed to be >>>> correct/complete at a given point in time. >>>> >>>> An example: >>>> 1) per1MinAggStream has received all events up to 09:03 >>>> 2) The 1Minute Windows have all triggered and are creating their >>>> Aggregates. >>>> 3) Some of the 1Minute Aggregates complete their calcs and are streamed >>>> to the GlobalWindow, >>>> 4) The AggregateDoFn that would create Aggregate[Pete:09:02,X,Y] is >>>> running slow. >>>> 4) The Global Window Triggers and sends the following to the >>>> GlobalAggDoFn: Iterable<Aggregate> = [Aggregate[Pete:09:03,X,Y], >>>> Aggregate[Pete:09:01,X,Y]] >>>> 5) I can construct a global total up to 09:01 emit that and re-window, >>>> but I cannot do anything further as I'm unable to tell whether there is >>>> actually data for the 09:02 or whether it has simply not arrived at the >>>> windowing function (All sorts of complications ensue). >>>> >>>> >>>> It just feels strange that even though I'm trying to tie everything to >>>> event time, when it comes to the global window (unless I get super fancy >>>> with stateful doFns, I cannot get to a state (or find triggers) that allow >>>> me to remain consistent over event time) I have to work around it and start >>>> taking processing time into account. >>>> >>>> This sort of leads me back to the question: Is there a trigger that >>>> ensures that the GlobalWindow releases only events (aggregates in this >>>> case) to the GlobalAggDoFn, once it knows that no earlier events will >>>> arrive at the function. (We have watermarked source, with messages arriving >>>> in order according to their event times). I've not seen any triggers that >>>> would do this, unless I suppose, like I asked earlier, if the >>>> GlobalWindowFn, somehow only emits events when they are complete in >>>> processing time as well as event time. >>>> >>>> Many thanks for all the help thus far. >>>> Stephan >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Jun 7, 2018 at 1:03 AM Raghu Angadi <[email protected]> wrote: >>>> >>>>> > * the PCollection has a notion of "which elements it has emitted for >>>>> a given position of the watermark" >>>>> This is not correct (to me it reads to me like you are saying >>>>> something close to 'PCollection is a stream of (element, watermark) >>>>> tuples'). >>>>> Every element in a PCollection has an associated with a >>>>> event_timestamp.. it is a tuple of (element, timestamp) tuples. Watermark >>>>> is not associated with a PCollection, and is completely independent of >>>>> event timestamp. >>>>> >>>>> IOW, Watermark is by definition monotonic. When a stage in your >>>>> pipeline sets its watermark to 'X' what it means is that each of its >>>>> inputs >>>>> (sources like PubSub, or stages) has communicated a watermark timestamp Y >>>>> saying it expects all its future elements will have event timestamp >= Y. >>>>> X = min(Y). A custom source that receives events with monotonically >>>>> increasing timestamp can just report the timestamp of the last element >>>>> emitted as it watermark. >>>>> >>>>> OnElementCountAtLeast(1) has no dependence on watermark progression, >>>>> since trigger does not depend on time. >>>>> >>>>> >>>>> On Mon, Jun 4, 2018 at 12:07 PM Stephan Kotze < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi there. >>>>>> >>>>>> I have a question regarding the completeness of results in a >>>>>> GlobalWindow for a pipeline which receives all events in order. (no >>>>>> late/missing data). >>>>>> >>>>>> The question is based on reasoning about Beam that takes 3 pieces of >>>>>> (our current) understanding into account: >>>>>> >>>>>> 1)Global Window Watermark >>>>>> As I understand it a PCollection with a GlobalWindow and ParDo will >>>>>> be running a watermark (which is what allows Triggers in stateful DoFns >>>>>> to >>>>>> fire for example). >>>>>> >>>>>> If this is the case, >>>>>> * the PCollection has a notion of "which elements it has emitted for >>>>>> a given position of the watermark" >>>>>> * the PCollection will also know which results from upstream >>>>>> PTransforms/Pcollections etc. are still in flight >>>>>> * the PCollection will emit results and update its watermark once >>>>>> Upstream elements have all provided their results and shifted their >>>>>> watermarks. >>>>>> >>>>>> 2) Triggering on Watermark >>>>>> For Fixed windows for example we have the >>>>>> "AfterWatermark".pastEndOfWindow() trigger. >>>>>> In the case of Global windows however, the GlobalWindow never >>>>>> "ends", so the watermark cannot progress past this point and we'll never >>>>>> get any results for something like >>>>>> newGlobalWindow().trigger(AfterWatermark.pastEndOfWindow()) >>>>>> >>>>>> 3) Ordering between upstream PCollections/PTransforms and >>>>>> GlobalWindows >>>>>> In the following pipeline: Source -> fixedWindow(1m) -> >>>>>> GlobalWindow(), the 1Min segments can arrive out of order in the global >>>>>> window, even if the source was ordered (with no late data) >>>>>> >>>>>> Thus the questions. >>>>>> >>>>>> If I have a ParDo on the GlobalWindow that is triggered by >>>>>> OnElementCountAtLeast(1) and events can arrive out of order, how can the >>>>>> ParDo have a watermark that moves only forward when it's possible To >>>>>> trigger on any amount of elements having arrived (this would appear to >>>>>> allow the emission of 2 later events, followed by an earlier event for >>>>>> another trigger). >>>>>> >>>>>> Or? >>>>>> >>>>>> Does the OnElementCountAtLeast only trigger once ALL upstream >>>>>> elements up to and including the watermark have arrived? (Though they may >>>>>> be unordered in the DoFn's input for example, it is still a complete list >>>>>> with All upstream produced elements between the last watermark and the >>>>>> "new" one that will be set once the ParDo has completed). >>>>>> >>>>>> I stress the point because it has some important repercussions for us >>>>>> (so I'm just paraphrasing the question slightly below, to try and make it >>>>>> as clear as possible :)) >>>>>> >>>>>> How can a PTransform/PCollection on a Global Window have a monotonic >>>>>> watermark if events can trigger calcs with out of order events (when >>>>>> using >>>>>> a trigger such as OnElementCountAtLeast(1)), or is there a Guarantee, >>>>>> that >>>>>> when the trigger fires, we will receive a complete list of upstream >>>>>> results >>>>>> up to the time of the latest event in the Collection we receive to >>>>>> process. >>>>>> >>>>>> Hopefully I've explained the question concisely enough :) >>>>>> >>>>>> Stephan >>>>>> >>>>>>
