Hi Rahul,

Thanks for the response.

I did consider State, but actually I was tentative because of a different
requirement that I didn't specify - the same pipeline should work for batch
and stream modes. I'm not sure how Stateful DoFn's behave in the batch
world: can you get Beam to pass the elements through the DoFn in a desired
order, e.g. by sorted by event time?

Most aggregations we're likely to be running are per-key rather than
global, so the parallelism issue might not be such a big deal.

Regards,
Sam

On Thu, Oct 10, 2019 at 5:30 PM rahul patwari <rahulpatwari8...@gmail.com>
wrote:

> Hi Sam,
>
> (Assuming all the tuples have the same key) One solution could be to use
> ParDo with State(to calculate mean) => For each element as they occur,
> calculate the Mean(store the sum and count as the state) and emit the tuple
> with the new average value.
> But it will limit the parallelism count.
>
> Regards,
> Rahul
>
> On Thu, Oct 10, 2019 at 9:15 PM Sam Stephens <samdjsteph...@gmail.com>
> wrote:
>
>> My team and I have been puzzling for a while how to solve a specific
>> problem.
>>
>> Say you have an input stream of tuples:
>>
>>   <String uuid, Integer value>
>>
>> And you want to output a stream containing:
>>
>>   <String uuid, Double average>
>>
>> Where the average is an aggregation over a 10 minute sliding window of
>> the "value" field.
>>
>> There are a couple of extra requirements:
>> A) We want to output a single result for each input tuple
>> B) We want to output a result as early as possible after the input
>> arrives (low latency)
>> C) We want the average value in result_i to have *seen* the value from
>> input_i
>>
>> An illustration of the input stream with corresponding output
>>
>> Time: 00:00:00
>> Input: <UUID1, 1>
>> Output: <UUID1, 1.0>
>>
>> Time: 00:02:00
>> Input: <UUID2, 2>
>> Output: <UUID2, 1.5>
>>
>> Time: 00:08:00
>> Input: <UUID3, 6>
>> Output: <UUID3, 3.0>
>>
>> Time: 00:13:00
>> Input: <UUID4, 4>
>> Output: <UUID4, 5.0>
>>
>> The issue we have is that without some magic tricks and hacky code,
>> achieving all 3 extra requirements is tough.  A naive solution looks like
>> this (beam pseudo-code):
>>
>>
>> PCollectionView<Double> agg = input
>>      .apply(Windows.sliding(10mins, 1sec
>> hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
>>      .apply(Mean.globally())
>>      .apply(View.asSingleton());
>>
>> PCollection<Tuples> output = input
>>      .apply(ParDo.of(new Joiner().withSideInputs(agg)));
>>
>>
>> The problem is that theres a race-condition - input elements can pass
>> through the Joiner DoFn before the sideInput corresponding to that element
>> is present. This makes solving the A, B, C requirements listed above
>> difficult.
>>
>> Has anyone solved a similar problem to this before? Any neat ideas?
>>
>

Reply via email to