OK, the only way to do this would be via a non-determanistic stateful
DoFn that buffers elements as they come in and computes averages by
looking at the buffer each time.

This could also be represented with an extension to window merging and
a join, where the trigger would be explicitly used to control the
balance between latency and correctness.

On Fri, Oct 11, 2019 at 8:01 AM Sam Stephens <[email protected]> wrote:
>
> On 2019/10/10 18:23:46, Eugene Kirpichov <[email protected]> wrote:
> > " input elements can pass through the Joiner DoFn before the sideInput
> > corresponding to that element is present"
> >
> > I don't think this is correct. Runners will evaluate a DoFn with side
> > inputs on elements in a given window only after all side inputs are ready
> > (have triggered at least once) in this window, so your code should be safe.
> > However, runners will not rerun the DoFn with side inputs on subsequent
> > triggerings of the side inputs, so you won't be able to update the results.
>
> Yes, but the second or third time an element falling into a given window is 
> processed by the Joiner DoFn the side input may not be up-to-date with these 
> new elements, so the side-input having triggered at least once is not a 
> guarantee it is up to date.
>
> On 2019/10/10 18:35:21, Robert Bradshaw <[email protected]> wrote:
>
> > Time: 00:08:00
> > Input: <UUID3, 6>
> Output: <UUID3, 6>
>
> >
> > Time: 00:13:00
> > Input: <UUID4, 4>
>
> Output: <UUID4, 5> // average 4 & 6
>
> >
> > Time: 00:00:00
> > Input: <UUID1, 1>
>
> Output: <UUID1, 1> // average 1
> >
> > Time: 00:02:00
> > Input: <UUID2, 2>
>
> Output: <UUID2, 1.5> // average 1 & 2
>
> I'd say the least surprising result here is that the aggregate includes the 
> best available information at the time of processing. So yes it is sensitive 
> to the order of arrival, that's unavoidable I think.
>
> >
> > Are you really trying to emit elements with the mean of all elements
> > with timestamp up to 10 minutes prior to the current value? That's a
> > bit different than sliding windows. In that a case you could do
> > something with a Stateful DoFn that buffers elements and for each
> > incoming element sets a timer at T which then reads the buffer,
> > computes the output, and discards elements older than 10 minutes. You
> > could also possibly do this with a custom WindowFn.
> >
>
> Yes the requirement is basically to enrich an event stream with values 
> computed over arbitrary other event streams (including the event stream being 
> enriched) and to do this with as low latency as possible.
>
> Of course the values derived from other event streams might not be included 
> even if they occur before the event being enriched (even if "before" is in 
> both the event time and processing time sense). But this is easier to swallow 
> because theres no obvious causal dependency between that aggregate value and 
> the event being enriched.
>
> .. I hope that made sense

Reply via email to