Hi Rahul, Thanks for the response.
I did consider State, but actually I was tentative because of a different requirement that I didn't specify - the same pipeline should work for batch and stream modes. I'm not sure how Stateful DoFn's behave in the batch world: can you get Beam to pass the elements through the DoFn in a desired order, e.g. by sorted by event time? Most aggregations we're likely to be running are per-key rather than global, so the parallelism issue might not be such a big deal. Regards, Sam On Thu, Oct 10, 2019 at 5:30 PM rahul patwari <rahulpatwari8...@gmail.com> wrote: > Hi Sam, > > (Assuming all the tuples have the same key) One solution could be to use > ParDo with State(to calculate mean) => For each element as they occur, > calculate the Mean(store the sum and count as the state) and emit the tuple > with the new average value. > But it will limit the parallelism count. > > Regards, > Rahul > > On Thu, Oct 10, 2019 at 9:15 PM Sam Stephens <samdjsteph...@gmail.com> > wrote: > >> My team and I have been puzzling for a while how to solve a specific >> problem. >> >> Say you have an input stream of tuples: >> >> <String uuid, Integer value> >> >> And you want to output a stream containing: >> >> <String uuid, Double average> >> >> Where the average is an aggregation over a 10 minute sliding window of >> the "value" field. >> >> There are a couple of extra requirements: >> A) We want to output a single result for each input tuple >> B) We want to output a result as early as possible after the input >> arrives (low latency) >> C) We want the average value in result_i to have *seen* the value from >> input_i >> >> An illustration of the input stream with corresponding output >> >> Time: 00:00:00 >> Input: <UUID1, 1> >> Output: <UUID1, 1.0> >> >> Time: 00:02:00 >> Input: <UUID2, 2> >> Output: <UUID2, 1.5> >> >> Time: 00:08:00 >> Input: <UUID3, 6> >> Output: <UUID3, 3.0> >> >> Time: 00:13:00 >> Input: <UUID4, 4> >> Output: <UUID4, 5.0> >> >> The issue we have is that without some magic tricks and hacky code, >> achieving all 3 extra requirements is tough. A naive solution looks like >> this (beam pseudo-code): >> >> >> PCollectionView<Double> agg = input >> .apply(Windows.sliding(10mins, 1sec >> hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))) >> .apply(Mean.globally()) >> .apply(View.asSingleton()); >> >> PCollection<Tuples> output = input >> .apply(ParDo.of(new Joiner().withSideInputs(agg))); >> >> >> The problem is that theres a race-condition - input elements can pass >> through the Joiner DoFn before the sideInput corresponding to that element >> is present. This makes solving the A, B, C requirements listed above >> difficult. >> >> Has anyone solved a similar problem to this before? Any neat ideas? >> >