Looking at the naive solution PCollectionView<Double> agg = input .apply(Windows.sliding(10mins, 1sec hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))) .apply(Mean.globally()) .apply(View.asSingleton());
PCollection<Tuples> output = input .apply(ParDo.of(new Joiner().withSideInputs(agg))); the constraint (C) is being violated because of your AfterPane.elementCountAtLeast trigger, which will emit averages before having seen all the values that should contribute to the average. It will emit a (speculative) average as soon as the first element comes in, and at each subsequent element, but this order may not correspond to the order in which elements get passed into the Joiner. Removing this trigger would hold back processing in Joiner until this element (as well as all other elements) in the window have been seen to produce the final average. Your computation seems to be sensitive to the ordering that elements arrive. What would you expect the output to be for Time: 00:08:00 Input: <UUID3, 6> Output: <UUID3, ???> Time: 00:13:00 Input: <UUID4, 4> Output: <UUID4, ???> Time: 00:00:00 Input: <UUID1, 1> Output: <UUID1, ???> Time: 00:02:00 Input: <UUID2, 2> Output: <UUID2, ???> Are you really trying to emit elements with the mean of all elements with timestamp up to 10 minutes prior to the current value? That's a bit different than sliding windows. In that a case you could do something with a Stateful DoFn that buffers elements and for each incoming element sets a timer at T which then reads the buffer, computes the output, and discards elements older than 10 minutes. You could also possibly do this with a custom WindowFn. On Thu, Oct 10, 2019 at 10:56 AM rahul patwari <rahulpatwari8...@gmail.com> wrote: > > With Stateful DoFn, each instance of DoFn will have elements which belong to > the same window and have the same key. So, the parallelism is limited by [no. > of keys * no. of Windows] On a practical note, no runner actually parallelizes across windows (and indeed sometimes, e.g. for merging windows like sessions, it's not actually possible). > In batch mode, as all the elements belong to the same window, i.e. Global > Window, the parallelism will be limited by the [no. of keys]. So, if you only > have one key, only one instance of DoFn will be running. > > AFAIK, it is not possible to pass the elements through the DoFn in the > desired order. That is correct. > On Thu, Oct 10, 2019 at 10:11 PM Sam Stephens <samdjsteph...@gmail.com> wrote: >> >> Hi Rahul, >> >> Thanks for the response. >> >> I did consider State, but actually I was tentative because of a different >> requirement that I didn't specify - the same pipeline should work for batch >> and stream modes. I'm not sure how Stateful DoFn's behave in the batch >> world: can you get Beam to pass the elements through the DoFn in a desired >> order, e.g. by sorted by event time? >> >> Most aggregations we're likely to be running are per-key rather than global, >> so the parallelism issue might not be such a big deal. >> >> Regards, >> Sam >> >> On Thu, Oct 10, 2019 at 5:30 PM rahul patwari <rahulpatwari8...@gmail.com> >> wrote: >>> >>> Hi Sam, >>> >>> (Assuming all the tuples have the same key) One solution could be to use >>> ParDo with State(to calculate mean) => For each element as they occur, >>> calculate the Mean(store the sum and count as the state) and emit the tuple >>> with the new average value. >>> But it will limit the parallelism count. >>> >>> Regards, >>> Rahul >>> >>> On Thu, Oct 10, 2019 at 9:15 PM Sam Stephens <samdjsteph...@gmail.com> >>> wrote: >>>> >>>> My team and I have been puzzling for a while how to solve a specific >>>> problem. >>>> >>>> Say you have an input stream of tuples: >>>> >>>> <String uuid, Integer value> >>>> >>>> And you want to output a stream containing: >>>> >>>> <String uuid, Double average> >>>> >>>> Where the average is an aggregation over a 10 minute sliding window of the >>>> "value" field. >>>> >>>> There are a couple of extra requirements: >>>> A) We want to output a single result for each input tuple >>>> B) We want to output a result as early as possible after the input arrives >>>> (low latency) >>>> C) We want the average value in result_i to have *seen* the value from >>>> input_i >>>> >>>> An illustration of the input stream with corresponding output >>>> >>>> Time: 00:00:00 >>>> Input: <UUID1, 1> >>>> Output: <UUID1, 1.0> >>>> >>>> Time: 00:02:00 >>>> Input: <UUID2, 2> >>>> Output: <UUID2, 1.5> >>>> >>>> Time: 00:08:00 >>>> Input: <UUID3, 6> >>>> Output: <UUID3, 3.0> >>>> >>>> Time: 00:13:00 >>>> Input: <UUID4, 4> >>>> Output: <UUID4, 5.0> >>>> >>>> The issue we have is that without some magic tricks and hacky code, >>>> achieving all 3 extra requirements is tough. A naive solution looks like >>>> this (beam pseudo-code): >>>> >>>> >>>> PCollectionView<Double> agg = input >>>> .apply(Windows.sliding(10mins, 1sec >>>> hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))) >>>> .apply(Mean.globally()) >>>> .apply(View.asSingleton()); >>>> >>>> PCollection<Tuples> output = input >>>> .apply(ParDo.of(new Joiner().withSideInputs(agg))); >>>> >>>> >>>> The problem is that theres a race-condition - input elements can pass >>>> through the Joiner DoFn before the sideInput corresponding to that element >>>> is present. This makes solving the A, B, C requirements listed above >>>> difficult. >>>> >>>> Has anyone solved a similar problem to this before? Any neat ideas?