This seems a great example of use of stateful DoFn. It has essentially the
same structure as the example on the Beam blog but is more meaningful.
Kenn
On Fri, Oct 11, 2019 at 12:38 PM Robert Bradshaw
wrote:
> OK, the only way to do this would be via a non-determanistic stateful
> DoFn that buff
OK, the only way to do this would be via a non-determanistic stateful
DoFn that buffers elements as they come in and computes averages by
looking at the buffer each time.
This could also be represented with an extension to window merging and
a join, where the trigger would be explicitly used to co
On 2019/10/10 18:23:46, Eugene Kirpichov wrote:
> " input elements can pass through the Joiner DoFn before the sideInput
> corresponding to that element is present"
>
> I don't think this is correct. Runners will evaluate a DoFn with side
> inputs on elements in a given window only after all sid
Looking at the naive solution
PCollectionView agg = input
.apply(Windows.sliding(10mins, 1sec
hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1
.apply(Mean.globally())
.apply(View.asSingleton());
PCollection output = input
.apply(ParDo.of(new Joiner().withSi
" input elements can pass through the Joiner DoFn before the sideInput
corresponding to that element is present"
I don't think this is correct. Runners will evaluate a DoFn with side
inputs on elements in a given window only after all side inputs are ready
(have triggered at least once) in this wi
With Stateful DoFn, each instance of DoFn will have elements which belong
to the same window and have the same key. So, the parallelism is limited by
[no. of keys * no. of Windows]
In batch mode, as all the elements belong to the same window, i.e. Global
Window, the parallelism will be limited by t
Hi Rahul,
Thanks for the response.
I did consider State, but actually I was tentative because of a different
requirement that I didn't specify - the same pipeline should work for batch
and stream modes. I'm not sure how Stateful DoFn's behave in the batch
world: can you get Beam to pass the eleme
Hi Sam,
(Assuming all the tuples have the same key) One solution could be to use
ParDo with State(to calculate mean) => For each element as they occur,
calculate the Mean(store the sum and count as the state) and emit the tuple
with the new average value.
But it will limit the parallelism count.
My team and I have been puzzling for a while how to solve a specific
problem.
Say you have an input stream of tuples:
And you want to output a stream containing:
Where the average is an aggregation over a 10 minute sliding window of the
"value" field.
There are a couple of extra require