My team and I have been puzzling for a while how to solve a specific problem.
Say you have an input stream of tuples: <String uuid, Integer value> And you want to output a stream containing: <String uuid, Double average> Where the average is an aggregation over a 10 minute sliding window of the "value" field. There are a couple of extra requirements: A) We want to output a single result for each input tuple B) We want to output a result as early as possible after the input arrives (low latency) C) We want the average value in result_i to have *seen* the value from input_i An illustration of the input stream with corresponding output Time: 00:00:00 Input: <UUID1, 1> Output: <UUID1, 1.0> Time: 00:02:00 Input: <UUID2, 2> Output: <UUID2, 1.5> Time: 00:08:00 Input: <UUID3, 6> Output: <UUID3, 3.0> Time: 00:13:00 Input: <UUID4, 4> Output: <UUID4, 5.0> The issue we have is that without some magic tricks and hacky code, achieving all 3 extra requirements is tough. A naive solution looks like this (beam pseudo-code): PCollectionView<Double> agg = input .apply(Windows.sliding(10mins, 1sec hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))) .apply(Mean.globally()) .apply(View.asSingleton()); PCollection<Tuples> output = input .apply(ParDo.of(new Joiner().withSideInputs(agg))); The problem is that theres a race-condition - input elements can pass through the Joiner DoFn before the sideInput corresponding to that element is present. This makes solving the A, B, C requirements listed above difficult. Has anyone solved a similar problem to this before? Any neat ideas?