With Stateful DoFn, each instance of DoFn will have elements which belong
to the same window and have the same key. So, the parallelism is limited by
[no. of keys * no. of Windows]
In batch mode, as all the elements belong to the same window, i.e. Global
Window, the parallelism will be limited by the [no. of keys]. So, if you
only have one key, only one instance of DoFn will be running.

AFAIK, it is not possible to pass the elements through the DoFn in the
desired order.

On Thu, Oct 10, 2019 at 10:11 PM Sam Stephens <samdjsteph...@gmail.com>
wrote:

> Hi Rahul,
>
> Thanks for the response.
>
> I did consider State, but actually I was tentative because of a different
> requirement that I didn't specify - the same pipeline should work for batch
> and stream modes. I'm not sure how Stateful DoFn's behave in the batch
> world: can you get Beam to pass the elements through the DoFn in a desired
> order, e.g. by sorted by event time?
>
> Most aggregations we're likely to be running are per-key rather than
> global, so the parallelism issue might not be such a big deal.
>
> Regards,
> Sam
>
> On Thu, Oct 10, 2019 at 5:30 PM rahul patwari <rahulpatwari8...@gmail.com>
> wrote:
>
>> Hi Sam,
>>
>> (Assuming all the tuples have the same key) One solution could be to use
>> ParDo with State(to calculate mean) => For each element as they occur,
>> calculate the Mean(store the sum and count as the state) and emit the tuple
>> with the new average value.
>> But it will limit the parallelism count.
>>
>> Regards,
>> Rahul
>>
>> On Thu, Oct 10, 2019 at 9:15 PM Sam Stephens <samdjsteph...@gmail.com>
>> wrote:
>>
>>> My team and I have been puzzling for a while how to solve a specific
>>> problem.
>>>
>>> Say you have an input stream of tuples:
>>>
>>>   <String uuid, Integer value>
>>>
>>> And you want to output a stream containing:
>>>
>>>   <String uuid, Double average>
>>>
>>> Where the average is an aggregation over a 10 minute sliding window of
>>> the "value" field.
>>>
>>> There are a couple of extra requirements:
>>> A) We want to output a single result for each input tuple
>>> B) We want to output a result as early as possible after the input
>>> arrives (low latency)
>>> C) We want the average value in result_i to have *seen* the value from
>>> input_i
>>>
>>> An illustration of the input stream with corresponding output
>>>
>>> Time: 00:00:00
>>> Input: <UUID1, 1>
>>> Output: <UUID1, 1.0>
>>>
>>> Time: 00:02:00
>>> Input: <UUID2, 2>
>>> Output: <UUID2, 1.5>
>>>
>>> Time: 00:08:00
>>> Input: <UUID3, 6>
>>> Output: <UUID3, 3.0>
>>>
>>> Time: 00:13:00
>>> Input: <UUID4, 4>
>>> Output: <UUID4, 5.0>
>>>
>>> The issue we have is that without some magic tricks and hacky code,
>>> achieving all 3 extra requirements is tough.  A naive solution looks like
>>> this (beam pseudo-code):
>>>
>>>
>>> PCollectionView<Double> agg = input
>>>      .apply(Windows.sliding(10mins, 1sec
>>> hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
>>>      .apply(Mean.globally())
>>>      .apply(View.asSingleton());
>>>
>>> PCollection<Tuples> output = input
>>>      .apply(ParDo.of(new Joiner().withSideInputs(agg)));
>>>
>>>
>>> The problem is that theres a race-condition - input elements can pass
>>> through the Joiner DoFn before the sideInput corresponding to that element
>>> is present. This makes solving the A, B, C requirements listed above
>>> difficult.
>>>
>>> Has anyone solved a similar problem to this before? Any neat ideas?
>>>
>>

Reply via email to