Thanks for the thorough replies!
Raghu, I think I have mistakenly used the wrong terminology with regards to
" PCollection has a notion of "which elements it has emitted for a given
position of the watermark" So apologies there :)
The essence here does seem to be in this though:
> Thus the questions.
>
> If I have a ParDo on the GlobalWindow that is triggered by
> OnElementCountAtLeast(1) and events can arrive out of order, how can the
> ParDo have a watermark that moves only forward when it's possible To
> trigger on any amount of elements having arrived (this would appear to
> allow the emission of 2 later events, followed by an earlier event for
> another trigger).
>
You have to re-implement effectively what Apache Beam Runners do by
computing the watermark of the elements, figuring out if they are early, on
time, late and buffering them so that they are grouped together correctly
for each window with a StatefulDoFn. This is quite difficult to do in
general.
This for me feels like an unnecessarily complex thing one needs to
implement, to ensure completeness/correctness even when you are sure to
have all the relevant data in the pipeline for a given time period already.
So if you'll humour me one more time please, I'll try to explain as
concisely as possible, (Because it just feels wrong that use case requires
added complexity of manually implemented stateful DoFns.
- I have an unbounded source of events like these:
Event[TimeStamp:Instant,PersonId:String,Amount:Long]
- The source is watermarked on event time (Event.TimeStamp)
- The messages have timestamps applied (beam timestamp = Event.TimeStamp
- Events arrive in order of event time.
- I create aggregates like these:
Aggregate[TimeStamp:Instant,PersonId:String, EventCount:Long,
AmountSum:long]
I would like to obtain the following:
--------------------------------------------------------------------------------------------------
| PersonId | Time | 1Min:EventCount | 1Min:AmountSum |
Global:EventCount | GlobalAmountSum |
--------------------------------------------------------------------------------------------------
| Pete | 09:01 | 3 | 7 | 3
| 7 |
--------------------------------------------------------------------------------------------------
| Pete | 09:02 | 1 | 2 | 4
| 9 |
--------------------------------------------------------------------------------------------------
| Pete | 09:03 | 5 | 9 | 9
| 18 |
--------------------------------------------------------------------------------------------------
....
--------------------------------------------------------------------------------------------------
A rough starting point for a pipeline is:
PCollection<KV<String, Aggregate>> per1MinAggStream =
UboundedSource<Event>
-> KV.Create(Event:PersonId)
-> FixedWindow.of(1M)
-> GroupByKey()
-> AggregateDoFn( -> c.output(key, new
Aggregate(window.getMaxTimeStamp(),key,count(),sum())))
PCollection<KV<KV<String, Aggregate>> allTimeStream =
per1MinAggStream
-> new GlobalWindow().trigger(??? OnElementCountAtleast(1) ????)
-> GroupByKey()
-> GlobalAggDoFn( -> c.output(key, new
Aggregate(window.getMaxTimeStamp(),key,count(),sum())))
///Potentially, re-assign allTimeStream to 1Min windows here
results = KeyedPCollectionTuple
.of(1MinAggsTag, per1MinAggStream)
.and(globalAggsTag, allTimeStream)
.apply(CoGroupByKey.create())
-> doFn()
This works if I can find a trigger for the global window that ensures it
does not emit any 1Min Aggs, with others still in flight (in processing not
event time) to my GlobalAggDoFn
The 1 min aggs may be out of order in the Iterable<Aggregate> provided to
GlobalAggDoFn, (I can re-order and emit the totals with the correct
timestamps then assign to new 1 min windows).
However, if the GlobalWindow triggers and the Iterable<Aggregate> still has
missing 1Min Aggs, the Global Aggregates cannot be guaranteed to be
correct/complete at a given point in time.
An example:
1) per1MinAggStream has received all events up to 09:03
2) The 1Minute Windows have all triggered and are creating their Aggregates.
3) Some of the 1Minute Aggregates complete their calcs and are streamed to
the GlobalWindow,
4) The AggregateDoFn that would create Aggregate[Pete:09:02,X,Y] is running
slow.
4) The Global Window Triggers and sends the following to the GlobalAggDoFn:
Iterable<Aggregate> = [Aggregate[Pete:09:03,X,Y], Aggregate[Pete:09:01,X,Y]]
5) I can construct a global total up to 09:01 emit that and re-window, but
I cannot do anything further as I'm unable to tell whether there is
actually data for the 09:02 or whether it has simply not arrived at the
windowing function (All sorts of complications ensue).
It just feels strange that even though I'm trying to tie everything to
event time, when it comes to the global window (unless I get super fancy
with stateful doFns, I cannot get to a state (or find triggers) that allow
me to remain consistent over event time) I have to work around it and start
taking processing time into account.
This sort of leads me back to the question: Is there a trigger that ensures
that the GlobalWindow releases only events (aggregates in this case) to the
GlobalAggDoFn, once it knows that no earlier events will arrive at the
function. (We have watermarked source, with messages arriving in order
according to their event times). I've not seen any triggers that would do
this, unless I suppose, like I asked earlier, if the GlobalWindowFn,
somehow only emits events when they are complete in processing time as well
as event time.
Many thanks for all the help thus far.
Stephan
On Thu, Jun 7, 2018 at 1:03 AM Raghu Angadi <[email protected]> wrote:
> > * the PCollection has a notion of "which elements it has emitted for a
> given position of the watermark"
> This is not correct (to me it reads to me like you are saying something
> close to 'PCollection is a stream of (element, watermark) tuples').
> Every element in a PCollection has an associated with a event_timestamp..
> it is a tuple of (element, timestamp) tuples. Watermark is not associated
> with a PCollection, and is completely independent of event timestamp.
>
> IOW, Watermark is by definition monotonic. When a stage in your pipeline
> sets its watermark to 'X' what it means is that each of its inputs (sources
> like PubSub, or stages) has communicated a watermark timestamp Y saying it
> expects all its future elements will have event timestamp >= Y. X =
> min(Y). A custom source that receives events with monotonically increasing
> timestamp can just report the timestamp of the last element emitted as it
> watermark.
>
> OnElementCountAtLeast(1) has no dependence on watermark progression, since
> trigger does not depend on time.
>
>
> On Mon, Jun 4, 2018 at 12:07 PM Stephan Kotze <[email protected]>
> wrote:
>
>> Hi there.
>>
>> I have a question regarding the completeness of results in a GlobalWindow
>> for a pipeline which receives all events in order. (no late/missing data).
>>
>> The question is based on reasoning about Beam that takes 3 pieces of (our
>> current) understanding into account:
>>
>> 1)Global Window Watermark
>> As I understand it a PCollection with a GlobalWindow and ParDo will be
>> running a watermark (which is what allows Triggers in stateful DoFns to
>> fire for example).
>>
>> If this is the case,
>> * the PCollection has a notion of "which elements it has emitted for a
>> given position of the watermark"
>> * the PCollection will also know which results from upstream
>> PTransforms/Pcollections etc. are still in flight
>> * the PCollection will emit results and update its watermark once
>> Upstream elements have all provided their results and shifted their
>> watermarks.
>>
>> 2) Triggering on Watermark
>> For Fixed windows for example we have the
>> "AfterWatermark".pastEndOfWindow() trigger.
>> In the case of Global windows however, the GlobalWindow never "ends", so
>> the watermark cannot progress past this point and we'll never get any
>> results for something like
>> newGlobalWindow().trigger(AfterWatermark.pastEndOfWindow())
>>
>> 3) Ordering between upstream PCollections/PTransforms and GlobalWindows
>> In the following pipeline: Source -> fixedWindow(1m) -> GlobalWindow(),
>> the 1Min segments can arrive out of order in the global window, even if the
>> source was ordered (with no late data)
>>
>> Thus the questions.
>>
>> If I have a ParDo on the GlobalWindow that is triggered by
>> OnElementCountAtLeast(1) and events can arrive out of order, how can the
>> ParDo have a watermark that moves only forward when it's possible To
>> trigger on any amount of elements having arrived (this would appear to
>> allow the emission of 2 later events, followed by an earlier event for
>> another trigger).
>>
>> Or?
>>
>> Does the OnElementCountAtLeast only trigger once ALL upstream elements up
>> to and including the watermark have arrived? (Though they may be unordered
>> in the DoFn's input for example, it is still a complete list with All
>> upstream produced elements between the last watermark and the "new" one
>> that will be set once the ParDo has completed).
>>
>> I stress the point because it has some important repercussions for us (so
>> I'm just paraphrasing the question slightly below, to try and make it as
>> clear as possible :))
>>
>> How can a PTransform/PCollection on a Global Window have a monotonic
>> watermark if events can trigger calcs with out of order events (when using
>> a trigger such as OnElementCountAtLeast(1)), or is there a Guarantee, that
>> when the trigger fires, we will receive a complete list of upstream results
>> up to the time of the latest event in the Collection we receive to process.
>>
>> Hopefully I've explained the question concisely enough :)
>>
>> Stephan
>>
>>