> * the PCollection has a notion of "which elements it has emitted for a
given position of the watermark"
This is not correct (to me it reads to me like you are saying something
close to 'PCollection is a stream of (element, watermark) tuples').
Every element in a PCollection has an associated with a event_timestamp..
it is a tuple of (element, timestamp) tuples. Watermark is not associated
with a PCollection, and is completely independent of event timestamp.

IOW, Watermark is by definition monotonic. When a stage in your pipeline
sets its watermark to 'X' what it means is that each of its inputs (sources
like PubSub, or stages) has communicated a watermark timestamp Y saying it
expects all its future elements will have event timestamp >= Y.  X =
min(Y). A custom source that receives events with monotonically increasing
timestamp can just report the timestamp of the last element emitted as it
watermark.

OnElementCountAtLeast(1) has no dependence on watermark progression, since
trigger does not depend on time.


On Mon, Jun 4, 2018 at 12:07 PM Stephan Kotze <[email protected]>
wrote:

> Hi there.
>
> I have a question regarding the completeness of results in a GlobalWindow
> for a pipeline which receives all events in order. (no late/missing data).
>
> The question is based on reasoning about Beam that takes 3 pieces of (our
> current) understanding into account:
>
> 1)Global Window Watermark
> As I understand it a PCollection with a GlobalWindow and ParDo will be
> running a watermark (which is what allows Triggers in stateful DoFns to
> fire for example).
>
> If this is the case,
>  * the PCollection has a notion of "which elements it has emitted for a
> given position of the watermark"
>  * the PCollection will also know which results from upstream
> PTransforms/Pcollections etc. are still in flight
>  * the PCollection will emit results and update its watermark once
> Upstream elements have all provided their results and shifted their
> watermarks.
>
> 2) Triggering on Watermark
>  For Fixed windows for example we have the
> "AfterWatermark".pastEndOfWindow() trigger.
>  In the case of Global windows however, the GlobalWindow never "ends", so
> the watermark cannot progress past this point and we'll never get any
> results for something like
> newGlobalWindow().trigger(AfterWatermark.pastEndOfWindow())
>
> 3) Ordering between upstream PCollections/PTransforms and GlobalWindows
> In the following pipeline:  Source -> fixedWindow(1m) -> GlobalWindow(),
> the 1Min segments can arrive out of order in the global window, even if the
> source was ordered (with no late data)
>
> Thus the questions.
>
> If I have a ParDo on the GlobalWindow that is triggered by
> OnElementCountAtLeast(1)  and events can arrive out of order, how can the
> ParDo have a watermark that moves only forward when it's possible To
> trigger on any amount of elements having arrived (this would appear to
> allow the emission of 2 later events, followed by an earlier event for
> another trigger).
>
> Or?
>
> Does the OnElementCountAtLeast only trigger once ALL upstream elements up
> to and including the watermark have arrived? (Though they may be unordered
> in the DoFn's input for example, it is still a complete list with All
> upstream produced elements between the last watermark and the "new" one
> that will be set once the ParDo has completed).
>
> I stress the point because it has some important repercussions for us (so
> I'm just paraphrasing the question slightly below, to try and make it as
> clear as possible :))
>
> How can a PTransform/PCollection on a Global Window have a monotonic
> watermark if events can trigger calcs with out of order events (when using
> a trigger such as OnElementCountAtLeast(1)), or is there a Guarantee, that
> when the trigger fires, we will receive a complete list of upstream results
> up to the time of the latest event in the Collection we receive to process.
>
> Hopefully I've explained the question concisely enough :)
>
> Stephan
>
>

Reply via email to