Looking at the naive solution

PCollectionView<Double> agg = input
     .apply(Windows.sliding(10mins, 1sec
hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
     .apply(Mean.globally())
     .apply(View.asSingleton());

PCollection<Tuples> output = input
     .apply(ParDo.of(new Joiner().withSideInputs(agg)));

the constraint (C) is being violated because of your
AfterPane.elementCountAtLeast trigger, which will emit averages before
having seen all the values that should contribute to the average. It
will emit a (speculative) average as soon as the first element comes
in, and at each subsequent element, but this order may not correspond
to the order in which elements get passed into the Joiner.

Removing this trigger would hold back processing in Joiner until this
element (as well as all other elements) in the window have been seen
to produce the final average.

Your computation seems to be sensitive to the ordering that elements
arrive. What would you expect the output to be for

Time: 00:08:00
Input: <UUID3, 6>
Output: <UUID3, ???>

Time: 00:13:00
Input: <UUID4, 4>
Output: <UUID4, ???>

Time: 00:00:00
Input: <UUID1, 1>
Output: <UUID1, ???>

Time: 00:02:00
Input: <UUID2, 2>
Output: <UUID2, ???>

Are you really trying to emit elements with the mean of all elements
with timestamp up to 10 minutes prior to the current value? That's a
bit different than sliding windows. In that a case you could do
something with a Stateful DoFn that buffers elements and for each
incoming element sets a timer at T which then reads the buffer,
computes the output, and discards elements older than 10 minutes. You
could also possibly do this with a custom WindowFn.

On Thu, Oct 10, 2019 at 10:56 AM rahul patwari
<rahulpatwari8...@gmail.com> wrote:
>
> With Stateful DoFn, each instance of DoFn will have elements which belong to 
> the same window and have the same key. So, the parallelism is limited by [no. 
> of keys * no. of Windows]

On a practical note, no runner actually parallelizes across windows
(and indeed sometimes, e.g. for merging windows like sessions, it's
not actually possible).

> In batch mode, as all the elements belong to the same window, i.e. Global 
> Window, the parallelism will be limited by the [no. of keys]. So, if you only 
> have one key, only one instance of DoFn will be running.
>
> AFAIK, it is not possible to pass the elements through the DoFn in the 
> desired order.

That is correct.

> On Thu, Oct 10, 2019 at 10:11 PM Sam Stephens <samdjsteph...@gmail.com> wrote:
>>
>> Hi Rahul,
>>
>> Thanks for the response.
>>
>> I did consider State, but actually I was tentative because of a different 
>> requirement that I didn't specify - the same pipeline should work for batch 
>> and stream modes. I'm not sure how Stateful DoFn's behave in the batch 
>> world: can you get Beam to pass the elements through the DoFn in a desired 
>> order, e.g. by sorted by event time?
>>
>> Most aggregations we're likely to be running are per-key rather than global, 
>> so the parallelism issue might not be such a big deal.
>>
>> Regards,
>> Sam
>>
>> On Thu, Oct 10, 2019 at 5:30 PM rahul patwari <rahulpatwari8...@gmail.com> 
>> wrote:
>>>
>>> Hi Sam,
>>>
>>> (Assuming all the tuples have the same key) One solution could be to use 
>>> ParDo with State(to calculate mean) => For each element as they occur, 
>>> calculate the Mean(store the sum and count as the state) and emit the tuple 
>>> with the new average value.
>>> But it will limit the parallelism count.
>>>
>>> Regards,
>>> Rahul
>>>
>>> On Thu, Oct 10, 2019 at 9:15 PM Sam Stephens <samdjsteph...@gmail.com> 
>>> wrote:
>>>>
>>>> My team and I have been puzzling for a while how to solve a specific 
>>>> problem.
>>>>
>>>> Say you have an input stream of tuples:
>>>>
>>>>   <String uuid, Integer value>
>>>>
>>>> And you want to output a stream containing:
>>>>
>>>>   <String uuid, Double average>
>>>>
>>>> Where the average is an aggregation over a 10 minute sliding window of the 
>>>> "value" field.
>>>>
>>>> There are a couple of extra requirements:
>>>> A) We want to output a single result for each input tuple
>>>> B) We want to output a result as early as possible after the input arrives 
>>>> (low latency)
>>>> C) We want the average value in result_i to have *seen* the value from 
>>>> input_i
>>>>
>>>> An illustration of the input stream with corresponding output
>>>>
>>>> Time: 00:00:00
>>>> Input: <UUID1, 1>
>>>> Output: <UUID1, 1.0>
>>>>
>>>> Time: 00:02:00
>>>> Input: <UUID2, 2>
>>>> Output: <UUID2, 1.5>
>>>>
>>>> Time: 00:08:00
>>>> Input: <UUID3, 6>
>>>> Output: <UUID3, 3.0>
>>>>
>>>> Time: 00:13:00
>>>> Input: <UUID4, 4>
>>>> Output: <UUID4, 5.0>
>>>>
>>>> The issue we have is that without some magic tricks and hacky code, 
>>>> achieving all 3 extra requirements is tough.  A naive solution looks like 
>>>> this (beam pseudo-code):
>>>>
>>>>
>>>> PCollectionView<Double> agg = input
>>>>      .apply(Windows.sliding(10mins, 1sec 
>>>> hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
>>>>      .apply(Mean.globally())
>>>>      .apply(View.asSingleton());
>>>>
>>>> PCollection<Tuples> output = input
>>>>      .apply(ParDo.of(new Joiner().withSideInputs(agg)));
>>>>
>>>>
>>>> The problem is that theres a race-condition - input elements can pass 
>>>> through the Joiner DoFn before the sideInput corresponding to that element 
>>>> is present. This makes solving the A, B, C requirements listed above 
>>>> difficult.
>>>>
>>>> Has anyone solved a similar problem to this before? Any neat ideas?

Reply via email to