On 2019/10/10 18:23:46, Eugene Kirpichov <kirpic...@google.com> wrote: 
> " input elements can pass through the Joiner DoFn before the sideInput
> corresponding to that element is present"
> 
> I don't think this is correct. Runners will evaluate a DoFn with side
> inputs on elements in a given window only after all side inputs are ready
> (have triggered at least once) in this window, so your code should be safe.
> However, runners will not rerun the DoFn with side inputs on subsequent
> triggerings of the side inputs, so you won't be able to update the results.

Yes, but the second or third time an element falling into a given window is 
processed by the Joiner DoFn the side input may not be up-to-date with these 
new elements, so the side-input having triggered at least once is not a 
guarantee it is up to date.

On 2019/10/10 18:35:21, Robert Bradshaw <rober...@google.com> wrote: 

> Time: 00:08:00
> Input: <UUID3, 6>
Output: <UUID3, 6>

> 
> Time: 00:13:00
> Input: <UUID4, 4>

Output: <UUID4, 5> // average 4 & 6

> 
> Time: 00:00:00
> Input: <UUID1, 1>

Output: <UUID1, 1> // average 1
> 
> Time: 00:02:00
> Input: <UUID2, 2>

Output: <UUID2, 1.5> // average 1 & 2

I'd say the least surprising result here is that the aggregate includes the 
best available information at the time of processing. So yes it is sensitive to 
the order of arrival, that's unavoidable I think.

> 
> Are you really trying to emit elements with the mean of all elements
> with timestamp up to 10 minutes prior to the current value? That's a
> bit different than sliding windows. In that a case you could do
> something with a Stateful DoFn that buffers elements and for each
> incoming element sets a timer at T which then reads the buffer,
> computes the output, and discards elements older than 10 minutes. You
> could also possibly do this with a custom WindowFn.
> 

Yes the requirement is basically to enrich an event stream with values computed 
over arbitrary other event streams (including the event stream being enriched) 
and to do this with as low latency as possible. 

Of course the values derived from other event streams might not be included 
even if they occur before the event being enriched (even if "before" is in both 
the event time and processing time sense). But this is easier to swallow 
because theres no obvious causal dependency between that aggregate value and 
the event being enriched.

.. I hope that made sense

Reply via email to