Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread Robert Bradshaw
On Thu, Jul 25, 2019 at 6:34 PM rahul patwari
 wrote:
>
> So, If an RPC call has to be performed for a batch of Rows(PCollection), 
> instead of each Row, the recommended way is to batch the Rows in 
> startBundle() of 
> DoFn(https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?

Yes.

> I thought Stateful and Timely Processing could be helpful here.

The upside is that you can persist state across bundles (which is
especially helpful when bundles are small, e.g. for streaming
pipelines). The downside is that you can't persist state across keys
(and it also enforces a shuffle to colocate the data by key).

If you get to choose your keys, you would want to have about as many
keys as you have concurrent bundles (or some small multiple, to ensure
they're not lumpily distributed). Keying by something like
System.identityHashCode(this) in the body of a DoFn might be
sufficient.

> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw  wrote:
>>
>> Though it's not obvious in the name, Stateful ParDos can only be
>> applied to keyed PCollections, similar to GroupByKey. (You could,
>> however, assign every element to the same key and then apply a
>> Stateful DoFn, though in that case all elements would get processed on
>> the same worker.)
>>
>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>>  wrote:
>> >
>> > Hi,
>> >
>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an 
>> > example of assigning an arbitrary-but-consistent index to each element on 
>> > a per key-and-window basis.
>> >
>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say, 
>> > PCollection with Fixed Windows, the state is maintained per window 
>> > and every element in the window will be assigned a consistent index?
>> > Does this mean every element belonging to the window will be processed in 
>> > a single DoFn Instance, which otherwise could have been done in multiple 
>> > parallel instances, limiting performance?
>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?
>> >
>> > Thanks,
>> > Rahul


Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread rahul patwari
Yes. But, GroupIntoBatches works on KV. We are working on
PCollection throughout our pipeline.
We can convert Row to KV. But, we only have a few keys and a Bounded
PCollection. As we have Global windows and a few keys, the opportunity for
parallelism is limited to [No. of keys] with Stateful ParDo [per Key, Per
Window] Processing.

On Thu, Jul 25, 2019 at 10:08 PM Reuven Lax  wrote:

> Have you looked at the GroupIntoBatches transform?
>
> On Thu, Jul 25, 2019 at 9:34 AM rahul patwari 
> wrote:
>
>> So, If an RPC call has to be performed for a batch of
>> Rows(PCollection), instead of each Row, the recommended way is to
>> batch the Rows in startBundle() of DoFn(
>> https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?
>> I thought Stateful and Timely Processing could be helpful here.
>>
>> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw 
>> wrote:
>>
>>> Though it's not obvious in the name, Stateful ParDos can only be
>>> applied to keyed PCollections, similar to GroupByKey. (You could,
>>> however, assign every element to the same key and then apply a
>>> Stateful DoFn, though in that case all elements would get processed on
>>> the same worker.)
>>>
>>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>>>  wrote:
>>> >
>>> > Hi,
>>> >
>>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>> gives an example of assigning an arbitrary-but-consistent index to each
>>> element on a per key-and-window basis.
>>> >
>>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
>>> PCollection with Fixed Windows, the state is maintained per window and
>>> every element in the window will be assigned a consistent index?
>>> > Does this mean every element belonging to the window will be processed
>>> in a single DoFn Instance, which otherwise could have been done in multiple
>>> parallel instances, limiting performance?
>>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed
>>> PCollection?
>>> >
>>> > Thanks,
>>> > Rahul
>>>
>>


Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread Reuven Lax
Have you looked at the GroupIntoBatches transform?

On Thu, Jul 25, 2019 at 9:34 AM rahul patwari 
wrote:

> So, If an RPC call has to be performed for a batch of
> Rows(PCollection), instead of each Row, the recommended way is to
> batch the Rows in startBundle() of DoFn(
> https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?
> I thought Stateful and Timely Processing could be helpful here.
>
> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw 
> wrote:
>
>> Though it's not obvious in the name, Stateful ParDos can only be
>> applied to keyed PCollections, similar to GroupByKey. (You could,
>> however, assign every element to the same key and then apply a
>> Stateful DoFn, though in that case all elements would get processed on
>> the same worker.)
>>
>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>>  wrote:
>> >
>> > Hi,
>> >
>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>> gives an example of assigning an arbitrary-but-consistent index to each
>> element on a per key-and-window basis.
>> >
>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
>> PCollection with Fixed Windows, the state is maintained per window and
>> every element in the window will be assigned a consistent index?
>> > Does this mean every element belonging to the window will be processed
>> in a single DoFn Instance, which otherwise could have been done in multiple
>> parallel instances, limiting performance?
>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed
>> PCollection?
>> >
>> > Thanks,
>> > Rahul
>>
>


Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread rahul patwari
So, If an RPC call has to be performed for a batch of
Rows(PCollection), instead of each Row, the recommended way is to
batch the Rows in startBundle() of DoFn(
https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?
I thought Stateful and Timely Processing could be helpful here.

On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw  wrote:

> Though it's not obvious in the name, Stateful ParDos can only be
> applied to keyed PCollections, similar to GroupByKey. (You could,
> however, assign every element to the same key and then apply a
> Stateful DoFn, though in that case all elements would get processed on
> the same worker.)
>
> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>  wrote:
> >
> > Hi,
> >
> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives
> an example of assigning an arbitrary-but-consistent index to each element
> on a per key-and-window basis.
> >
> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
> PCollection with Fixed Windows, the state is maintained per window and
> every element in the window will be assigned a consistent index?
> > Does this mean every element belonging to the window will be processed
> in a single DoFn Instance, which otherwise could have been done in multiple
> parallel instances, limiting performance?
> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed
> PCollection?
> >
> > Thanks,
> > Rahul
>


Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread Robert Bradshaw
Though it's not obvious in the name, Stateful ParDos can only be
applied to keyed PCollections, similar to GroupByKey. (You could,
however, assign every element to the same key and then apply a
Stateful DoFn, though in that case all elements would get processed on
the same worker.)

On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
 wrote:
>
> Hi,
>
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an 
> example of assigning an arbitrary-but-consistent index to each element on a 
> per key-and-window basis.
>
> If the Stateful ParDo is applied on a Non-Keyed PCollection, say, 
> PCollection with Fixed Windows, the state is maintained per window and 
> every element in the window will be assigned a consistent index?
> Does this mean every element belonging to the window will be processed in a 
> single DoFn Instance, which otherwise could have been done in multiple 
> parallel instances, limiting performance?
> Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?
>
> Thanks,
> Rahul


Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread rahul patwari
Hi,

https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an
example of assigning an arbitrary-but-consistent index to each element on a
per key-and-window basis.

If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
PCollection with Fixed Windows, the state is maintained per window and
every element in the window will be assigned a consistent index?
Does this mean every element belonging to the window will be processed in a
single DoFn Instance, which otherwise could have been done in multiple
parallel instances, limiting performance?
Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?

Thanks,
Rahul