Re: Stateful ParDo on Non-Keyed PCollection
On Thu, Jul 25, 2019 at 6:34 PM rahul patwari wrote: > > So, If an RPC call has to be performed for a batch of Rows(PCollection), > instead of each Row, the recommended way is to batch the Rows in > startBundle() of > DoFn(https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)? Yes. > I thought Stateful and Timely Processing could be helpful here. The upside is that you can persist state across bundles (which is especially helpful when bundles are small, e.g. for streaming pipelines). The downside is that you can't persist state across keys (and it also enforces a shuffle to colocate the data by key). If you get to choose your keys, you would want to have about as many keys as you have concurrent bundles (or some small multiple, to ensure they're not lumpily distributed). Keying by something like System.identityHashCode(this) in the body of a DoFn might be sufficient. > On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw wrote: >> >> Though it's not obvious in the name, Stateful ParDos can only be >> applied to keyed PCollections, similar to GroupByKey. (You could, >> however, assign every element to the same key and then apply a >> Stateful DoFn, though in that case all elements would get processed on >> the same worker.) >> >> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari >> wrote: >> > >> > Hi, >> > >> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html gives an >> > example of assigning an arbitrary-but-consistent index to each element on >> > a per key-and-window basis. >> > >> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say, >> > PCollection with Fixed Windows, the state is maintained per window >> > and every element in the window will be assigned a consistent index? >> > Does this mean every element belonging to the window will be processed in >> > a single DoFn Instance, which otherwise could have been done in multiple >> > parallel instances, limiting performance? >> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection? >> > >> > Thanks, >> > Rahul
Re: Stateful ParDo on Non-Keyed PCollection
Yes. But, GroupIntoBatches works on KV. We are working on PCollection throughout our pipeline. We can convert Row to KV. But, we only have a few keys and a Bounded PCollection. As we have Global windows and a few keys, the opportunity for parallelism is limited to [No. of keys] with Stateful ParDo [per Key, Per Window] Processing. On Thu, Jul 25, 2019 at 10:08 PM Reuven Lax wrote: > Have you looked at the GroupIntoBatches transform? > > On Thu, Jul 25, 2019 at 9:34 AM rahul patwari > wrote: > >> So, If an RPC call has to be performed for a batch of >> Rows(PCollection), instead of each Row, the recommended way is to >> batch the Rows in startBundle() of DoFn( >> https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)? >> I thought Stateful and Timely Processing could be helpful here. >> >> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw >> wrote: >> >>> Though it's not obvious in the name, Stateful ParDos can only be >>> applied to keyed PCollections, similar to GroupByKey. (You could, >>> however, assign every element to the same key and then apply a >>> Stateful DoFn, though in that case all elements would get processed on >>> the same worker.) >>> >>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari >>> wrote: >>> > >>> > Hi, >>> > >>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html >>> gives an example of assigning an arbitrary-but-consistent index to each >>> element on a per key-and-window basis. >>> > >>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say, >>> PCollection with Fixed Windows, the state is maintained per window and >>> every element in the window will be assigned a consistent index? >>> > Does this mean every element belonging to the window will be processed >>> in a single DoFn Instance, which otherwise could have been done in multiple >>> parallel instances, limiting performance? >>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed >>> PCollection? >>> > >>> > Thanks, >>> > Rahul >>> >>
Re: Stateful ParDo on Non-Keyed PCollection
Have you looked at the GroupIntoBatches transform? On Thu, Jul 25, 2019 at 9:34 AM rahul patwari wrote: > So, If an RPC call has to be performed for a batch of > Rows(PCollection), instead of each Row, the recommended way is to > batch the Rows in startBundle() of DoFn( > https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)? > I thought Stateful and Timely Processing could be helpful here. > > On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw > wrote: > >> Though it's not obvious in the name, Stateful ParDos can only be >> applied to keyed PCollections, similar to GroupByKey. (You could, >> however, assign every element to the same key and then apply a >> Stateful DoFn, though in that case all elements would get processed on >> the same worker.) >> >> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari >> wrote: >> > >> > Hi, >> > >> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html >> gives an example of assigning an arbitrary-but-consistent index to each >> element on a per key-and-window basis. >> > >> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say, >> PCollection with Fixed Windows, the state is maintained per window and >> every element in the window will be assigned a consistent index? >> > Does this mean every element belonging to the window will be processed >> in a single DoFn Instance, which otherwise could have been done in multiple >> parallel instances, limiting performance? >> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed >> PCollection? >> > >> > Thanks, >> > Rahul >> >
Re: Stateful ParDo on Non-Keyed PCollection
So, If an RPC call has to be performed for a batch of Rows(PCollection), instead of each Row, the recommended way is to batch the Rows in startBundle() of DoFn( https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)? I thought Stateful and Timely Processing could be helpful here. On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw wrote: > Though it's not obvious in the name, Stateful ParDos can only be > applied to keyed PCollections, similar to GroupByKey. (You could, > however, assign every element to the same key and then apply a > Stateful DoFn, though in that case all elements would get processed on > the same worker.) > > On Thu, Jul 25, 2019 at 6:06 PM rahul patwari > wrote: > > > > Hi, > > > > https://beam.apache.org/blog/2017/02/13/stateful-processing.html gives > an example of assigning an arbitrary-but-consistent index to each element > on a per key-and-window basis. > > > > If the Stateful ParDo is applied on a Non-Keyed PCollection, say, > PCollection with Fixed Windows, the state is maintained per window and > every element in the window will be assigned a consistent index? > > Does this mean every element belonging to the window will be processed > in a single DoFn Instance, which otherwise could have been done in multiple > parallel instances, limiting performance? > > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed > PCollection? > > > > Thanks, > > Rahul >
Re: Stateful ParDo on Non-Keyed PCollection
Though it's not obvious in the name, Stateful ParDos can only be applied to keyed PCollections, similar to GroupByKey. (You could, however, assign every element to the same key and then apply a Stateful DoFn, though in that case all elements would get processed on the same worker.) On Thu, Jul 25, 2019 at 6:06 PM rahul patwari wrote: > > Hi, > > https://beam.apache.org/blog/2017/02/13/stateful-processing.html gives an > example of assigning an arbitrary-but-consistent index to each element on a > per key-and-window basis. > > If the Stateful ParDo is applied on a Non-Keyed PCollection, say, > PCollection with Fixed Windows, the state is maintained per window and > every element in the window will be assigned a consistent index? > Does this mean every element belonging to the window will be processed in a > single DoFn Instance, which otherwise could have been done in multiple > parallel instances, limiting performance? > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection? > > Thanks, > Rahul