" input elements can pass through the Joiner DoFn before the sideInput
corresponding to that element is present"

I don't think this is correct. Runners will evaluate a DoFn with side
inputs on elements in a given window only after all side inputs are ready
(have triggered at least once) in this window, so your code should be safe.
However, runners will not rerun the DoFn with side inputs on subsequent
triggerings of the side inputs, so you won't be able to update the results.

On Thu, Oct 10, 2019 at 8:45 AM Sam Stephens <samdjsteph...@gmail.com>
wrote:

> My team and I have been puzzling for a while how to solve a specific
> problem.
>
> Say you have an input stream of tuples:
>
>   <String uuid, Integer value>
>
> And you want to output a stream containing:
>
>   <String uuid, Double average>
>
> Where the average is an aggregation over a 10 minute sliding window of the
> "value" field.
>
> There are a couple of extra requirements:
> A) We want to output a single result for each input tuple
> B) We want to output a result as early as possible after the input arrives
> (low latency)
> C) We want the average value in result_i to have *seen* the value from
> input_i
>
> An illustration of the input stream with corresponding output
>
> Time: 00:00:00
> Input: <UUID1, 1>
> Output: <UUID1, 1.0>
>
> Time: 00:02:00
> Input: <UUID2, 2>
> Output: <UUID2, 1.5>
>
> Time: 00:08:00
> Input: <UUID3, 6>
> Output: <UUID3, 3.0>
>
> Time: 00:13:00
> Input: <UUID4, 4>
> Output: <UUID4, 5.0>
>
> The issue we have is that without some magic tricks and hacky code,
> achieving all 3 extra requirements is tough.  A naive solution looks like
> this (beam pseudo-code):
>
>
> PCollectionView<Double> agg = input
>      .apply(Windows.sliding(10mins, 1sec
> hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
>      .apply(Mean.globally())
>      .apply(View.asSingleton());
>
> PCollection<Tuples> output = input
>      .apply(ParDo.of(new Joiner().withSideInputs(agg)));
>
>
> The problem is that theres a race-condition - input elements can pass
> through the Joiner DoFn before the sideInput corresponding to that element
> is present. This makes solving the A, B, C requirements listed above
> difficult.
>
> Has anyone solved a similar problem to this before? Any neat ideas?
>

Reply via email to