Yep, I get that watermarks can move forward in Chunks greater than one.

I am also comfortable with the notion of Aggregate[Pete:09:02,X,Y] before
Aggregate[Pete:09:01,X,Y] and them arriving out of order at another Window
with it's own triggers.

I don't need the data ordered in event time (strictly speaking) I'm happy
with them arriving in any order, But I only want the trigger to fire and
release its elements once all of the aggs up to that point in time have
become available.

I did indeed consider the previous approach (using the feedback loop), and
yep the problem is no different, I wanted to explore the space further and
find a more elegant solution (Not introducing Cycles if there was a better
way to handle it).





On Thu, Jun 7, 2018 at 10:34 PM Lukasz Cwik <[email protected]> wrote:

> A watermark is a lower bound on data that is processed and available. It
> is specifically a lower bound because we want runners to be able to process
> each window in parallel.
>
> In your example, a Runner may choose to compute Aggregate[Pete:09:01,X,Y]
> in parallel with Aggregate[Pete:09:02,X,Y] even if the watermark is only at
> 9:00 and then advance the watermark to 9:03. This means that a downstream
> computation may see Aggregate[Pete:09:02,X,Y] before
> Aggregate[Pete:09:01,X,Y]. The Runner may actually process 1000s of windows
> at the same time in parallel and advance the watermark by an arbitrarily
> large number. Nothing states that the watermark only needs to advance one
> time unit at a time.
>
> Your problem is specifically saying I want you to provide me all the data
> ordered in event time per key (a specialization of sorting). This would
> require Runners to take this massively parallel computation and order it
> per key which doesn't exist in Apache Beam. People have requested support
> for ordering/sorting in the past and there is some limited support inside
> the sorter extension[1] but nothing like what your looking for. The only
> way to do this is to build this ordering yourself, if you can provide a
> generalization I'm sure Apache Beam would be interested in a contribution
> of that kind.
>
> On the other hand, do session windows fit your usecase or do you truly
> need a global aggregation that is ongoing?
>
> Also, you had a very similar thread about doing Global Aggregation[2],
> does using a feedback loop via Pubsub/Kafka help you solve your problem so
> your not using a Global Window and can rely on Apache Beam's triggers to
> handle the "global" aggregation? (and how is this problem different then
> the last?)
>
> 1: https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
> 2:
> https://lists.apache.org/thread.html/d24e57f918804fe0e8bbd950cbda772eb612c978e753530914398fd8@%3Cuser.beam.apache.org%3E
>
>
>
> On Thu, Jun 7, 2018 at 2:48 AM Stephan Kotze <[email protected]>
> wrote:
>
>> Thanks for the thorough replies!
>>
>> Raghu, I think I have mistakenly used the wrong terminology with regards
>> to "  PCollection has a notion of "which elements it has emitted for a
>> given position of the watermark" So apologies there :)
>>
>> The essence here does seem to be in this though:
>>
>>> Thus the questions.
>>>
>>> If I have a ParDo on the GlobalWindow that is triggered by
>>> OnElementCountAtLeast(1)  and events can arrive out of order, how can the
>>> ParDo have a watermark that moves only forward when it's possible To
>>> trigger on any amount of elements having arrived (this would appear to
>>> allow the emission of 2 later events, followed by an earlier event for
>>> another trigger).
>>>
>> You have to re-implement effectively what Apache Beam Runners do by
>> computing the watermark of the elements, figuring out if they are early, on
>> time, late and buffering them so that they are grouped together correctly
>> for each window with a StatefulDoFn. This is quite difficult to do in
>> general.
>>
>> This for me feels like an unnecessarily complex thing one needs to
>> implement, to ensure completeness/correctness even when you are sure to
>> have all the relevant data in the pipeline for a given time period already.
>>
>> So if you'll humour me one more time please, I'll try to explain as
>> concisely as possible, (Because it just feels wrong that use case requires
>> added complexity of manually implemented stateful DoFns.
>>
>>
>>    - I have an unbounded source of events like these:
>>    Event[TimeStamp:Instant,PersonId:String,Amount:Long]
>>    - The source is watermarked on event time (Event.TimeStamp)
>>    - The messages have timestamps applied (beam timestamp =
>>    Event.TimeStamp
>>    - Events arrive in order of event time.
>>    - I create aggregates like these:
>>    Aggregate[TimeStamp:Instant,PersonId:String, EventCount:Long,
>>    AmountSum:long]
>>
>>
>> I would like to obtain the following:
>>
>>
>> --------------------------------------------------------------------------------------------------
>> | PersonId  |  Time    | 1Min:EventCount | 1Min:AmountSum |
>> Global:EventCount | GlobalAmountSum  |
>>
>> --------------------------------------------------------------------------------------------------
>> | Pete      | 09:01    | 3               | 7              | 3
>>      | 7                |
>>
>> --------------------------------------------------------------------------------------------------
>> | Pete      | 09:02    | 1               | 2              | 4
>>      | 9                |
>>
>> --------------------------------------------------------------------------------------------------
>> | Pete      | 09:03    | 5               | 9              | 9
>>      | 18               |
>>
>> --------------------------------------------------------------------------------------------------
>> ....
>>
>> --------------------------------------------------------------------------------------------------
>>
>>
>> A rough starting point for a pipeline is:
>>
>> PCollection<KV<String, Aggregate>> per1MinAggStream =
>>    UboundedSource<Event>
>>    -> KV.Create(Event:PersonId)
>>    -> FixedWindow.of(1M)
>>    -> GroupByKey()
>>    -> AggregateDoFn( -> c.output(key, new
>> Aggregate(window.getMaxTimeStamp(),key,count(),sum())))
>>
>> PCollection<KV<KV<String, Aggregate>> allTimeStream =
>> per1MinAggStream
>> -> new GlobalWindow().trigger(??? OnElementCountAtleast(1) ????)
>> -> GroupByKey()
>> -> GlobalAggDoFn( -> c.output(key, new
>> Aggregate(window.getMaxTimeStamp(),key,count(),sum())))
>>
>> ///Potentially, re-assign allTimeStream to 1Min windows here
>>
>> results =  KeyedPCollectionTuple
>>     .of(1MinAggsTag, per1MinAggStream)
>>     .and(globalAggsTag, allTimeStream)
>>     .apply(CoGroupByKey.create())
>>     -> doFn()
>>
>> This works if I can find a trigger for the global window that ensures it
>> does not emit any 1Min Aggs, with others still in flight (in processing not
>> event time) to my  GlobalAggDoFn
>> The 1 min aggs may be out of order in the Iterable<Aggregate> provided to
>> GlobalAggDoFn, (I can re-order and emit the totals with the correct
>> timestamps then assign to new 1 min windows).
>> However, if the GlobalWindow triggers and the Iterable<Aggregate> still
>> has missing 1Min Aggs, the Global Aggregates cannot be guaranteed to be
>> correct/complete at a given point in time.
>>
>> An example:
>> 1) per1MinAggStream has received all events up to 09:03
>> 2) The 1Minute Windows have all triggered and are creating their
>> Aggregates.
>> 3) Some of the 1Minute Aggregates complete their calcs and are streamed
>> to the GlobalWindow,
>> 4) The AggregateDoFn that would create Aggregate[Pete:09:02,X,Y] is
>> running slow.
>> 4) The Global Window Triggers and sends the following to the
>> GlobalAggDoFn: Iterable<Aggregate> = [Aggregate[Pete:09:03,X,Y],
>> Aggregate[Pete:09:01,X,Y]]
>> 5) I can construct a global total up to 09:01 emit that and re-window,
>> but I cannot do anything further as I'm unable to tell whether there is
>> actually data for the 09:02 or whether it has simply not arrived at the
>> windowing function (All sorts of complications ensue).
>>
>>
>> It just feels strange that even though I'm trying to tie everything to
>> event time, when it comes to the global window (unless I get super fancy
>> with stateful doFns, I cannot get to a state (or find triggers) that allow
>> me to remain consistent over event time) I have to work around it and start
>> taking processing time into account.
>>
>> This sort of leads me back to the question: Is there a trigger that
>> ensures that the GlobalWindow releases only events (aggregates in this
>> case) to the GlobalAggDoFn, once it knows that no earlier events will
>> arrive at the function. (We have watermarked source, with messages arriving
>> in order according to their event times). I've not seen any triggers that
>> would do this, unless I suppose, like I asked earlier, if the
>> GlobalWindowFn, somehow only emits events when they are complete in
>> processing time as well as event time.
>>
>> Many thanks for all the help thus far.
>> Stephan
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Jun 7, 2018 at 1:03 AM Raghu Angadi <[email protected]> wrote:
>>
>>> > * the PCollection has a notion of "which elements it has emitted for a
>>> given position of the watermark"
>>> This is not correct (to me it reads to me like you are saying something
>>> close to 'PCollection is a stream of (element, watermark) tuples').
>>> Every element in a PCollection has an associated with a
>>> event_timestamp.. it is a tuple of (element, timestamp) tuples. Watermark
>>> is not associated with a PCollection, and is completely independent of
>>> event timestamp.
>>>
>>> IOW, Watermark is by definition monotonic. When a stage in your pipeline
>>> sets its watermark to 'X' what it means is that each of its inputs (sources
>>> like PubSub, or stages) has communicated a watermark timestamp Y saying it
>>> expects all its future elements will have event timestamp >= Y.  X =
>>> min(Y). A custom source that receives events with monotonically increasing
>>> timestamp can just report the timestamp of the last element emitted as it
>>> watermark.
>>>
>>> OnElementCountAtLeast(1) has no dependence on watermark progression,
>>> since trigger does not depend on time.
>>>
>>>
>>> On Mon, Jun 4, 2018 at 12:07 PM Stephan Kotze <[email protected]>
>>> wrote:
>>>
>>>> Hi there.
>>>>
>>>> I have a question regarding the completeness of results in a
>>>> GlobalWindow for a pipeline which receives all events in order. (no
>>>> late/missing data).
>>>>
>>>> The question is based on reasoning about Beam that takes 3 pieces of
>>>> (our current) understanding into account:
>>>>
>>>> 1)Global Window Watermark
>>>> As I understand it a PCollection with a GlobalWindow and ParDo will be
>>>> running a watermark (which is what allows Triggers in stateful DoFns to
>>>> fire for example).
>>>>
>>>> If this is the case,
>>>>  * the PCollection has a notion of "which elements it has emitted for a
>>>> given position of the watermark"
>>>>  * the PCollection will also know which results from upstream
>>>> PTransforms/Pcollections etc. are still in flight
>>>>  * the PCollection will emit results and update its watermark once
>>>> Upstream elements have all provided their results and shifted their
>>>> watermarks.
>>>>
>>>> 2) Triggering on Watermark
>>>>  For Fixed windows for example we have the
>>>> "AfterWatermark".pastEndOfWindow() trigger.
>>>>  In the case of Global windows however, the GlobalWindow never "ends",
>>>> so the watermark cannot progress past this point and we'll never get any
>>>> results for something like
>>>> newGlobalWindow().trigger(AfterWatermark.pastEndOfWindow())
>>>>
>>>> 3) Ordering between upstream PCollections/PTransforms and GlobalWindows
>>>> In the following pipeline:  Source -> fixedWindow(1m) ->
>>>> GlobalWindow(), the 1Min segments can arrive out of order in the global
>>>> window, even if the source was ordered (with no late data)
>>>>
>>>> Thus the questions.
>>>>
>>>> If I have a ParDo on the GlobalWindow that is triggered by
>>>> OnElementCountAtLeast(1)  and events can arrive out of order, how can the
>>>> ParDo have a watermark that moves only forward when it's possible To
>>>> trigger on any amount of elements having arrived (this would appear to
>>>> allow the emission of 2 later events, followed by an earlier event for
>>>> another trigger).
>>>>
>>>> Or?
>>>>
>>>> Does the OnElementCountAtLeast only trigger once ALL upstream elements
>>>> up to and including the watermark have arrived? (Though they may be
>>>> unordered in the DoFn's input for example, it is still a complete list with
>>>> All upstream produced elements between the last watermark and the "new" one
>>>> that will be set once the ParDo has completed).
>>>>
>>>> I stress the point because it has some important repercussions for us
>>>> (so I'm just paraphrasing the question slightly below, to try and make it
>>>> as clear as possible :))
>>>>
>>>> How can a PTransform/PCollection on a Global Window have a monotonic
>>>> watermark if events can trigger calcs with out of order events (when using
>>>> a trigger such as OnElementCountAtLeast(1)), or is there a Guarantee, that
>>>> when the trigger fires, we will receive a complete list of upstream results
>>>> up to the time of the latest event in the Collection we receive to process.
>>>>
>>>> Hopefully I've explained the question concisely enough :)
>>>>
>>>> Stephan
>>>>
>>>>

Reply via email to