This seems a great example of use of stateful DoFn. It has essentially the
same structure as the example on the Beam blog but is more meaningful.

Kenn

On Fri, Oct 11, 2019 at 12:38 PM Robert Bradshaw <[email protected]>
wrote:

> OK, the only way to do this would be via a non-determanistic stateful
> DoFn that buffers elements as they come in and computes averages by
> looking at the buffer each time.
>
> This could also be represented with an extension to window merging and
> a join, where the trigger would be explicitly used to control the
> balance between latency and correctness.
>
> On Fri, Oct 11, 2019 at 8:01 AM Sam Stephens <[email protected]>
> wrote:
> >
> > On 2019/10/10 18:23:46, Eugene Kirpichov <[email protected]> wrote:
> > > " input elements can pass through the Joiner DoFn before the sideInput
> > > corresponding to that element is present"
> > >
> > > I don't think this is correct. Runners will evaluate a DoFn with side
> > > inputs on elements in a given window only after all side inputs are
> ready
> > > (have triggered at least once) in this window, so your code should be
> safe.
> > > However, runners will not rerun the DoFn with side inputs on subsequent
> > > triggerings of the side inputs, so you won't be able to update the
> results.
> >
> > Yes, but the second or third time an element falling into a given window
> is processed by the Joiner DoFn the side input may not be up-to-date with
> these new elements, so the side-input having triggered at least once is not
> a guarantee it is up to date.
> >
> > On 2019/10/10 18:35:21, Robert Bradshaw <[email protected]> wrote:
> >
> > > Time: 00:08:00
> > > Input: <UUID3, 6>
> > Output: <UUID3, 6>
> >
> > >
> > > Time: 00:13:00
> > > Input: <UUID4, 4>
> >
> > Output: <UUID4, 5> // average 4 & 6
> >
> > >
> > > Time: 00:00:00
> > > Input: <UUID1, 1>
> >
> > Output: <UUID1, 1> // average 1
> > >
> > > Time: 00:02:00
> > > Input: <UUID2, 2>
> >
> > Output: <UUID2, 1.5> // average 1 & 2
> >
> > I'd say the least surprising result here is that the aggregate includes
> the best available information at the time of processing. So yes it is
> sensitive to the order of arrival, that's unavoidable I think.
> >
> > >
> > > Are you really trying to emit elements with the mean of all elements
> > > with timestamp up to 10 minutes prior to the current value? That's a
> > > bit different than sliding windows. In that a case you could do
> > > something with a Stateful DoFn that buffers elements and for each
> > > incoming element sets a timer at T which then reads the buffer,
> > > computes the output, and discards elements older than 10 minutes. You
> > > could also possibly do this with a custom WindowFn.
> > >
> >
> > Yes the requirement is basically to enrich an event stream with values
> computed over arbitrary other event streams (including the event stream
> being enriched) and to do this with as low latency as possible.
> >
> > Of course the values derived from other event streams might not be
> included even if they occur before the event being enriched (even if
> "before" is in both the event time and processing time sense). But this is
> easier to swallow because theres no obvious causal dependency between that
> aggregate value and the event being enriched.
> >
> > .. I hope that made sense
>

Reply via email to