On Mon, Jun 4, 2018 at 12:07 PM Stephan Kotze <[email protected]>
wrote:

> Hi there.
>
> I have a question regarding the completeness of results in a GlobalWindow
> for a pipeline which receives all events in order. (no late/missing data).
>
> The question is based on reasoning about Beam that takes 3 pieces of (our
> current) understanding into account:
>
> 1)Global Window Watermark
> As I understand it a PCollection with a GlobalWindow and ParDo will be
> running a watermark (which is what allows Triggers in stateful DoFns to
> fire for example).
>
> If this is the case,
>  * the PCollection has a notion of "which elements it has emitted for a
> given position of the watermark"
>  * the PCollection will also know which results from upstream
> PTransforms/Pcollections etc. are still in flight
>  * the PCollection will emit results and update its watermark once
> Upstream elements have all provided their results and shifted their
> watermarks.
>
>
This is upto the Runner to implement but from what I recall, all runners
just have the watermark start at -infinity for a PCollection and stay at
-infinity while elements are being consumed from that PCollection. For
bounded PCollections, this jumps to +infinity when there are no more
elements to consume and means that watermark based triggers all "fire" at
the same time. For unbounded PCollections, the watermark stays at -infinity
forever and hence watermark based triggers will not fire.


> 2) Triggering on Watermark
>  For Fixed windows for example we have the
> "AfterWatermark".pastEndOfWindow() trigger.
>  In the case of Global windows however, the GlobalWindow never "ends", so
> the watermark cannot progress past this point and we'll never get any
> results for something like
> newGlobalWindow().trigger(AfterWatermark.pastEndOfWindow())
>
> The GlobalWindow does end for bounded PCollections since the watermark
advances to +infinity when we run out of elements to consume. For unbounded
PCollections, your correct, the GlobalWindow never "ends".


> 3) Ordering between upstream PCollections/PTransforms and GlobalWindows
> In the following pipeline:  Source -> fixedWindow(1m) -> GlobalWindow(),
> the 1Min segments can arrive out of order in the global window, even if the
> source was ordered (with no late data)
>
> Triggers will control when things appear in the GlobalWindow and they only
"fire" at GroupByKey.


> Thus the questions.
>
> If I have a ParDo on the GlobalWindow that is triggered by
> OnElementCountAtLeast(1)  and events can arrive out of order, how can the
> ParDo have a watermark that moves only forward when it's possible To
> trigger on any amount of elements having arrived (this would appear to
> allow the emission of 2 later events, followed by an earlier event for
> another trigger).
>
You have to re-implement effectively what Apache Beam Runners do by
computing the watermark of the elements, figuring out if they are early, on
time, late and buffering them so that they are grouped together correctly
for each window with a StatefulDoFn. This is quite difficult to do in
general.


>
> Or?
>
> Does the OnElementCountAtLeast only trigger once ALL upstream elements up
> to and including the watermark have arrived? (Though they may be unordered
> in the DoFn's input for example, it is still a complete list with All
> upstream produced elements between the last watermark and the "new" one
> that will be set once the ParDo has completed).
>

> I stress the point because it has some important repercussions for us (so
> I'm just paraphrasing the question slightly below, to try and make it as
> clear as possible :))
>
> How can a PTransform/PCollection on a Global Window have a monotonic
> watermark if events can trigger calcs with out of order events (when using
> a trigger such as OnElementCountAtLeast(1)), or is there a Guarantee, that
> when the trigger fires, we will receive a complete list of upstream results
> up to the time of the latest event in the Collection we receive to process.
>
> Hopefully I've explained the question concisely enough :)
>
> Stephan
>
>
It is hard to understand your use case, does your pipeline look like:
GBK(FixedWindows(1m) + Afterwatermark.pastEndOfWindow()) -> ParDo(A) ->
GBK(GlobalWindow + OnElementCountAtLeast(1)) -> ParDo(B)
Also, can you provide a flow of elements as an example through the pipeline
like A sees this and then this and then this, which means that B will
always see this and then ...?
Also, are you talking about an unbounded pipeline?

Reply via email to