My team and I have been puzzling for a while how to solve a specific
problem.

Say you have an input stream of tuples:

  <String uuid, Integer value>

And you want to output a stream containing:

  <String uuid, Double average>

Where the average is an aggregation over a 10 minute sliding window of the
"value" field.

There are a couple of extra requirements:
A) We want to output a single result for each input tuple
B) We want to output a result as early as possible after the input arrives
(low latency)
C) We want the average value in result_i to have *seen* the value from
input_i

An illustration of the input stream with corresponding output

Time: 00:00:00
Input: <UUID1, 1>
Output: <UUID1, 1.0>

Time: 00:02:00
Input: <UUID2, 2>
Output: <UUID2, 1.5>

Time: 00:08:00
Input: <UUID3, 6>
Output: <UUID3, 3.0>

Time: 00:13:00
Input: <UUID4, 4>
Output: <UUID4, 5.0>

The issue we have is that without some magic tricks and hacky code,
achieving all 3 extra requirements is tough.  A naive solution looks like
this (beam pseudo-code):


PCollectionView<Double> agg = input
     .apply(Windows.sliding(10mins, 1sec
hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
     .apply(Mean.globally())
     .apply(View.asSingleton());

PCollection<Tuples> output = input
     .apply(ParDo.of(new Joiner().withSideInputs(agg)));


The problem is that theres a race-condition - input elements can pass
through the Joiner DoFn before the sideInput corresponding to that element
is present. This makes solving the A, B, C requirements listed above
difficult.

Has anyone solved a similar problem to this before? Any neat ideas?

Reply via email to