Re: Apply a Beam PTransform per key

2021-05-24 Thread Kenneth Knowles
I'm just pinging this thread because I think it is an interesting problem
and don't want it to slip by.

I bet a lot of users have gone through the tedious conversion you describe.
Of course, it may often not be possible if you are using a library
transform. There are a number of aspects of the Beam model that are
designed a specific way explicitly *because* we need to assume that a large
number of composites in your pipeline are not modifiable by you. Most
closely related: this is why windowing is something carried along
implicitly rather than just a parameter to GBK - that would require all
transforms to expose how they use GBK under the hood and they would all
have to plumb this extra key/WindowFn through every API. Instead, we have
this way to implicitly add a second key to any transform :-)

So in addition to being tedious for you, it would be good to have a better
solution.

Kenn

On Fri, May 21, 2021 at 7:18 PM Stephan Hoyer  wrote:

> I'd like to write a Beam PTransform that applies an *existing* Beam
> transform to each set of grouped values, separately, and combines the
> result. Is anything like this possible with Beam using the Python SDK?
>
> Here are the closest things I've come up with:
> 1. If each set of *inputs* to my transform fit into memory, I could use
> GroupByKey followed by FlatMap.
> 2. If each set of *outputs* from my transform fit into memory, I could
> use CombinePerKey.
> 3. If I knew the static number of groups ahead of time, I could use
> Partition, followed by applying my transform multiple times, followed by
> Flatten.
>
> In my scenario, none of these holds true. For example, currently I have
> ~20 groups of values, with each group holding ~1 TB of data. My custom
> transform simply shuffles this TB of data around, so each set of outputs is
> also 1TB in size.
>
> In my particular case, it seems my options are to either relax these
> constraints, or to manually convert each step of my existing transform to
> apply per key. This conversion process is tedious, but very
> straightforward, e.g., the GroupByKey and ParDo that my transform is built
> out of just need to deal with an expanded key.
>
> I wonder, could this be something built into Beam itself, e.g,. as
> TransformPerKey? The ptranforms that result from combining other Beam
> transforms (e.g., _ChainPTransform in Python) are private, so this seems
> like something that would need to exist in Beam itself, if it could exist
> at all.
>
> Cheers,
> Stephan
>


Re: Apply a Beam PTransform per key

2021-05-24 Thread Brian Hulette
Isn't it possible to read the grouped values produced by a GBK from an
Iterable and yield results as you go, without needing to collect all of
each input into memory? Perhaps I'm misunderstanding your use-case.

Brian

On Mon, May 24, 2021 at 10:41 AM Kenneth Knowles  wrote:

> I'm just pinging this thread because I think it is an interesting problem
> and don't want it to slip by.
>
> I bet a lot of users have gone through the tedious conversion you
> describe. Of course, it may often not be possible if you are using a
> library transform. There are a number of aspects of the Beam model that are
> designed a specific way explicitly *because* we need to assume that a large
> number of composites in your pipeline are not modifiable by you. Most
> closely related: this is why windowing is something carried along
> implicitly rather than just a parameter to GBK - that would require all
> transforms to expose how they use GBK under the hood and they would all
> have to plumb this extra key/WindowFn through every API. Instead, we have
> this way to implicitly add a second key to any transform :-)
>
> So in addition to being tedious for you, it would be good to have a better
> solution.
>
> Kenn
>
> On Fri, May 21, 2021 at 7:18 PM Stephan Hoyer  wrote:
>
>> I'd like to write a Beam PTransform that applies an *existing* Beam
>> transform to each set of grouped values, separately, and combines the
>> result. Is anything like this possible with Beam using the Python SDK?
>>
>> Here are the closest things I've come up with:
>> 1. If each set of *inputs* to my transform fit into memory, I could use
>> GroupByKey followed by FlatMap.
>> 2. If each set of *outputs* from my transform fit into memory, I could
>> use CombinePerKey.
>> 3. If I knew the static number of groups ahead of time, I could use
>> Partition, followed by applying my transform multiple times, followed by
>> Flatten.
>>
>> In my scenario, none of these holds true. For example, currently I have
>> ~20 groups of values, with each group holding ~1 TB of data. My custom
>> transform simply shuffles this TB of data around, so each set of outputs is
>> also 1TB in size.
>>
>> In my particular case, it seems my options are to either relax these
>> constraints, or to manually convert each step of my existing transform to
>> apply per key. This conversion process is tedious, but very
>> straightforward, e.g., the GroupByKey and ParDo that my transform is built
>> out of just need to deal with an expanded key.
>>
>> I wonder, could this be something built into Beam itself, e.g,. as
>> TransformPerKey? The ptranforms that result from combining other Beam
>> transforms (e.g., _ChainPTransform in Python) are private, so this seems
>> like something that would need to exist in Beam itself, if it could exist
>> at all.
>>
>> Cheers,
>> Stephan
>>
>


Re: Apply a Beam PTransform per key

2021-05-24 Thread Kenneth Knowles
I was thinking there was some non-trivial topology (such as further GBKs)
within the logic to be applied to each key group.

Kenn

On Mon, May 24, 2021 at 2:38 PM Brian Hulette  wrote:

> Isn't it possible to read the grouped values produced by a GBK from an
> Iterable and yield results as you go, without needing to collect all of
> each input into memory? Perhaps I'm misunderstanding your use-case.
>
> Brian
>
> On Mon, May 24, 2021 at 10:41 AM Kenneth Knowles  wrote:
>
>> I'm just pinging this thread because I think it is an interesting problem
>> and don't want it to slip by.
>>
>> I bet a lot of users have gone through the tedious conversion you
>> describe. Of course, it may often not be possible if you are using a
>> library transform. There are a number of aspects of the Beam model that are
>> designed a specific way explicitly *because* we need to assume that a large
>> number of composites in your pipeline are not modifiable by you. Most
>> closely related: this is why windowing is something carried along
>> implicitly rather than just a parameter to GBK - that would require all
>> transforms to expose how they use GBK under the hood and they would all
>> have to plumb this extra key/WindowFn through every API. Instead, we have
>> this way to implicitly add a second key to any transform :-)
>>
>> So in addition to being tedious for you, it would be good to have a
>> better solution.
>>
>> Kenn
>>
>> On Fri, May 21, 2021 at 7:18 PM Stephan Hoyer  wrote:
>>
>>> I'd like to write a Beam PTransform that applies an *existing* Beam
>>> transform to each set of grouped values, separately, and combines the
>>> result. Is anything like this possible with Beam using the Python SDK?
>>>
>>> Here are the closest things I've come up with:
>>> 1. If each set of *inputs* to my transform fit into memory, I could use
>>> GroupByKey followed by FlatMap.
>>> 2. If each set of *outputs* from my transform fit into memory, I could
>>> use CombinePerKey.
>>> 3. If I knew the static number of groups ahead of time, I could use
>>> Partition, followed by applying my transform multiple times, followed by
>>> Flatten.
>>>
>>> In my scenario, none of these holds true. For example, currently I have
>>> ~20 groups of values, with each group holding ~1 TB of data. My custom
>>> transform simply shuffles this TB of data around, so each set of outputs is
>>> also 1TB in size.
>>>
>>> In my particular case, it seems my options are to either relax these
>>> constraints, or to manually convert each step of my existing transform to
>>> apply per key. This conversion process is tedious, but very
>>> straightforward, e.g., the GroupByKey and ParDo that my transform is built
>>> out of just need to deal with an expanded key.
>>>
>>> I wonder, could this be something built into Beam itself, e.g,. as
>>> TransformPerKey? The ptranforms that result from combining other Beam
>>> transforms (e.g., _ChainPTransform in Python) are private, so this seems
>>> like something that would need to exist in Beam itself, if it could exist
>>> at all.
>>>
>>> Cheers,
>>> Stephan
>>>
>>


Re: Apply a Beam PTransform per key

2021-05-24 Thread Stephan Hoyer
Exactly, my use-case has another nested GroupByKey to apply per key. But
even if it could be done in a streaming fashion, it's way too much data (1
TB) to process on a single worker in a reasonable amount of time.

On Mon, May 24, 2021 at 2:46 PM Kenneth Knowles  wrote:

> I was thinking there was some non-trivial topology (such as further GBKs)
> within the logic to be applied to each key group.
>
> Kenn
>
> On Mon, May 24, 2021 at 2:38 PM Brian Hulette  wrote:
>
>> Isn't it possible to read the grouped values produced by a GBK from an
>> Iterable and yield results as you go, without needing to collect all of
>> each input into memory? Perhaps I'm misunderstanding your use-case.
>>
>> Brian
>>
>> On Mon, May 24, 2021 at 10:41 AM Kenneth Knowles  wrote:
>>
>>> I'm just pinging this thread because I think it is an interesting
>>> problem and don't want it to slip by.
>>>
>>> I bet a lot of users have gone through the tedious conversion you
>>> describe. Of course, it may often not be possible if you are using a
>>> library transform. There are a number of aspects of the Beam model that are
>>> designed a specific way explicitly *because* we need to assume that a large
>>> number of composites in your pipeline are not modifiable by you. Most
>>> closely related: this is why windowing is something carried along
>>> implicitly rather than just a parameter to GBK - that would require all
>>> transforms to expose how they use GBK under the hood and they would all
>>> have to plumb this extra key/WindowFn through every API. Instead, we have
>>> this way to implicitly add a second key to any transform :-)
>>>
>>> So in addition to being tedious for you, it would be good to have a
>>> better solution.
>>>
>>> Kenn
>>>
>>> On Fri, May 21, 2021 at 7:18 PM Stephan Hoyer  wrote:
>>>
 I'd like to write a Beam PTransform that applies an *existing* Beam
 transform to each set of grouped values, separately, and combines the
 result. Is anything like this possible with Beam using the Python SDK?

 Here are the closest things I've come up with:
 1. If each set of *inputs* to my transform fit into memory, I could
 use GroupByKey followed by FlatMap.
 2. If each set of *outputs* from my transform fit into memory, I could
 use CombinePerKey.
 3. If I knew the static number of groups ahead of time, I could use
 Partition, followed by applying my transform multiple times, followed by
 Flatten.

 In my scenario, none of these holds true. For example, currently I have
 ~20 groups of values, with each group holding ~1 TB of data. My custom
 transform simply shuffles this TB of data around, so each set of outputs is
 also 1TB in size.

 In my particular case, it seems my options are to either relax these
 constraints, or to manually convert each step of my existing transform to
 apply per key. This conversion process is tedious, but very
 straightforward, e.g., the GroupByKey and ParDo that my transform is built
 out of just need to deal with an expanded key.

 I wonder, could this be something built into Beam itself, e.g,. as
 TransformPerKey? The ptranforms that result from combining other Beam
 transforms (e.g., _ChainPTransform in Python) are private, so this seems
 like something that would need to exist in Beam itself, if it could exist
 at all.

 Cheers,
 Stephan

>>>


Re: Apply a Beam PTransform per key

2021-05-24 Thread Reuven Lax
Is the issue that you have a different topology depending on the key?

On Mon, May 24, 2021 at 2:49 PM Stephan Hoyer  wrote:

> Exactly, my use-case has another nested GroupByKey to apply per key. But
> even if it could be done in a streaming fashion, it's way too much data (1
> TB) to process on a single worker in a reasonable amount of time.
>
> On Mon, May 24, 2021 at 2:46 PM Kenneth Knowles  wrote:
>
>> I was thinking there was some non-trivial topology (such as further GBKs)
>> within the logic to be applied to each key group.
>>
>> Kenn
>>
>> On Mon, May 24, 2021 at 2:38 PM Brian Hulette 
>> wrote:
>>
>>> Isn't it possible to read the grouped values produced by a GBK from an
>>> Iterable and yield results as you go, without needing to collect all of
>>> each input into memory? Perhaps I'm misunderstanding your use-case.
>>>
>>> Brian
>>>
>>> On Mon, May 24, 2021 at 10:41 AM Kenneth Knowles 
>>> wrote:
>>>
 I'm just pinging this thread because I think it is an interesting
 problem and don't want it to slip by.

 I bet a lot of users have gone through the tedious conversion you
 describe. Of course, it may often not be possible if you are using a
 library transform. There are a number of aspects of the Beam model that are
 designed a specific way explicitly *because* we need to assume that a large
 number of composites in your pipeline are not modifiable by you. Most
 closely related: this is why windowing is something carried along
 implicitly rather than just a parameter to GBK - that would require all
 transforms to expose how they use GBK under the hood and they would all
 have to plumb this extra key/WindowFn through every API. Instead, we have
 this way to implicitly add a second key to any transform :-)

 So in addition to being tedious for you, it would be good to have a
 better solution.

 Kenn

 On Fri, May 21, 2021 at 7:18 PM Stephan Hoyer 
 wrote:

> I'd like to write a Beam PTransform that applies an *existing* Beam
> transform to each set of grouped values, separately, and combines the
> result. Is anything like this possible with Beam using the Python SDK?
>
> Here are the closest things I've come up with:
> 1. If each set of *inputs* to my transform fit into memory, I could
> use GroupByKey followed by FlatMap.
> 2. If each set of *outputs* from my transform fit into memory, I
> could use CombinePerKey.
> 3. If I knew the static number of groups ahead of time, I could use
> Partition, followed by applying my transform multiple times, followed by
> Flatten.
>
> In my scenario, none of these holds true. For example, currently I
> have ~20 groups of values, with each group holding ~1 TB of data. My 
> custom
> transform simply shuffles this TB of data around, so each set of outputs 
> is
> also 1TB in size.
>
> In my particular case, it seems my options are to either relax these
> constraints, or to manually convert each step of my existing transform to
> apply per key. This conversion process is tedious, but very
> straightforward, e.g., the GroupByKey and ParDo that my transform is built
> out of just need to deal with an expanded key.
>
> I wonder, could this be something built into Beam itself, e.g,. as
> TransformPerKey? The ptranforms that result from combining other Beam
> transforms (e.g., _ChainPTransform in Python) are private, so this seems
> like something that would need to exist in Beam itself, if it could exist
> at all.
>
> Cheers,
> Stephan
>



Re: Apply a Beam PTransform per key

2021-05-24 Thread Stephan Hoyer
I'm not concerned with key-dependent topologies, which I didn't even think
was possible to express in Beam.

It's more that I already wrote a PTransform for processing a *single* 1 TB
dataset. Now I want to write a single PTransform that effectively runs the
original PTransform in groups over ~20 such datasets (ideally without
needing to know that number 20 ahead of time).

On Mon, May 24, 2021 at 3:30 PM Reuven Lax  wrote:

> Is the issue that you have a different topology depending on the key?
>
> On Mon, May 24, 2021 at 2:49 PM Stephan Hoyer  wrote:
>
>> Exactly, my use-case has another nested GroupByKey to apply per key. But
>> even if it could be done in a streaming fashion, it's way too much data (1
>> TB) to process on a single worker in a reasonable amount of time.
>>
>> On Mon, May 24, 2021 at 2:46 PM Kenneth Knowles  wrote:
>>
>>> I was thinking there was some non-trivial topology (such as further
>>> GBKs) within the logic to be applied to each key group.
>>>
>>> Kenn
>>>
>>> On Mon, May 24, 2021 at 2:38 PM Brian Hulette 
>>> wrote:
>>>
 Isn't it possible to read the grouped values produced by a GBK from an
 Iterable and yield results as you go, without needing to collect all of
 each input into memory? Perhaps I'm misunderstanding your use-case.

 Brian

 On Mon, May 24, 2021 at 10:41 AM Kenneth Knowles 
 wrote:

> I'm just pinging this thread because I think it is an interesting
> problem and don't want it to slip by.
>
> I bet a lot of users have gone through the tedious conversion you
> describe. Of course, it may often not be possible if you are using a
> library transform. There are a number of aspects of the Beam model that 
> are
> designed a specific way explicitly *because* we need to assume that a 
> large
> number of composites in your pipeline are not modifiable by you. Most
> closely related: this is why windowing is something carried along
> implicitly rather than just a parameter to GBK - that would require all
> transforms to expose how they use GBK under the hood and they would all
> have to plumb this extra key/WindowFn through every API. Instead, we have
> this way to implicitly add a second key to any transform :-)
>
> So in addition to being tedious for you, it would be good to have a
> better solution.
>
> Kenn
>
> On Fri, May 21, 2021 at 7:18 PM Stephan Hoyer 
> wrote:
>
>> I'd like to write a Beam PTransform that applies an *existing* Beam
>> transform to each set of grouped values, separately, and combines the
>> result. Is anything like this possible with Beam using the Python SDK?
>>
>> Here are the closest things I've come up with:
>> 1. If each set of *inputs* to my transform fit into memory, I could
>> use GroupByKey followed by FlatMap.
>> 2. If each set of *outputs* from my transform fit into memory, I
>> could use CombinePerKey.
>> 3. If I knew the static number of groups ahead of time, I could use
>> Partition, followed by applying my transform multiple times, followed by
>> Flatten.
>>
>> In my scenario, none of these holds true. For example, currently I
>> have ~20 groups of values, with each group holding ~1 TB of data. My 
>> custom
>> transform simply shuffles this TB of data around, so each set of outputs 
>> is
>> also 1TB in size.
>>
>> In my particular case, it seems my options are to either relax these
>> constraints, or to manually convert each step of my existing transform to
>> apply per key. This conversion process is tedious, but very
>> straightforward, e.g., the GroupByKey and ParDo that my transform is 
>> built
>> out of just need to deal with an expanded key.
>>
>> I wonder, could this be something built into Beam itself, e.g,. as
>> TransformPerKey? The ptranforms that result from combining other Beam
>> transforms (e.g., _ChainPTransform in Python) are private, so this seems
>> like something that would need to exist in Beam itself, if it could exist
>> at all.
>>
>> Cheers,
>> Stephan
>>
>


Re: Apply a Beam PTransform per key

2021-05-24 Thread Reuven Lax
Can you explain a bit more? Where are these data sets coming from?

On Mon, May 24, 2021 at 3:55 PM Stephan Hoyer  wrote:

> I'm not concerned with key-dependent topologies, which I didn't even think
> was possible to express in Beam.
>
> It's more that I already wrote a PTransform for processing a *single* 1
> TB dataset. Now I want to write a single PTransform that effectively runs
> the original PTransform in groups over ~20 such datasets (ideally without
> needing to know that number 20 ahead of time).
>
> On Mon, May 24, 2021 at 3:30 PM Reuven Lax  wrote:
>
>> Is the issue that you have a different topology depending on the key?
>>
>> On Mon, May 24, 2021 at 2:49 PM Stephan Hoyer  wrote:
>>
>>> Exactly, my use-case has another nested GroupByKey to apply per key. But
>>> even if it could be done in a streaming fashion, it's way too much data (1
>>> TB) to process on a single worker in a reasonable amount of time.
>>>
>>> On Mon, May 24, 2021 at 2:46 PM Kenneth Knowles  wrote:
>>>
 I was thinking there was some non-trivial topology (such as further
 GBKs) within the logic to be applied to each key group.

 Kenn

 On Mon, May 24, 2021 at 2:38 PM Brian Hulette 
 wrote:

> Isn't it possible to read the grouped values produced by a GBK from an
> Iterable and yield results as you go, without needing to collect all of
> each input into memory? Perhaps I'm misunderstanding your use-case.
>
> Brian
>
> On Mon, May 24, 2021 at 10:41 AM Kenneth Knowles 
> wrote:
>
>> I'm just pinging this thread because I think it is an interesting
>> problem and don't want it to slip by.
>>
>> I bet a lot of users have gone through the tedious conversion you
>> describe. Of course, it may often not be possible if you are using a
>> library transform. There are a number of aspects of the Beam model that 
>> are
>> designed a specific way explicitly *because* we need to assume that a 
>> large
>> number of composites in your pipeline are not modifiable by you. Most
>> closely related: this is why windowing is something carried along
>> implicitly rather than just a parameter to GBK - that would require all
>> transforms to expose how they use GBK under the hood and they would all
>> have to plumb this extra key/WindowFn through every API. Instead, we have
>> this way to implicitly add a second key to any transform :-)
>>
>> So in addition to being tedious for you, it would be good to have a
>> better solution.
>>
>> Kenn
>>
>> On Fri, May 21, 2021 at 7:18 PM Stephan Hoyer 
>> wrote:
>>
>>> I'd like to write a Beam PTransform that applies an *existing* Beam
>>> transform to each set of grouped values, separately, and combines the
>>> result. Is anything like this possible with Beam using the Python SDK?
>>>
>>> Here are the closest things I've come up with:
>>> 1. If each set of *inputs* to my transform fit into memory, I could
>>> use GroupByKey followed by FlatMap.
>>> 2. If each set of *outputs* from my transform fit into memory, I
>>> could use CombinePerKey.
>>> 3. If I knew the static number of groups ahead of time, I could use
>>> Partition, followed by applying my transform multiple times, followed by
>>> Flatten.
>>>
>>> In my scenario, none of these holds true. For example, currently I
>>> have ~20 groups of values, with each group holding ~1 TB of data. My 
>>> custom
>>> transform simply shuffles this TB of data around, so each set of 
>>> outputs is
>>> also 1TB in size.
>>>
>>> In my particular case, it seems my options are to either relax these
>>> constraints, or to manually convert each step of my existing transform 
>>> to
>>> apply per key. This conversion process is tedious, but very
>>> straightforward, e.g., the GroupByKey and ParDo that my transform is 
>>> built
>>> out of just need to deal with an expanded key.
>>>
>>> I wonder, could this be something built into Beam itself, e.g,. as
>>> TransformPerKey? The ptranforms that result from combining other Beam
>>> transforms (e.g., _ChainPTransform in Python) are private, so this seems
>>> like something that would need to exist in Beam itself, if it could 
>>> exist
>>> at all.
>>>
>>> Cheers,
>>> Stephan
>>>
>>


Re: Apply a Beam PTransform per key

2021-05-24 Thread Stephan Hoyer
Happy to give a concrete example, I even have open source code I can share
in this case :)
https://github.com/google/xarray-beam/tree/9728970aa18abddafec22a23cad92b5d4a1e11e5/examples
https://github.com/google/xarray-beam/blob/9728970aa18abddafec22a23cad92b5d4a1e11e5/examples/era5_rechunk.py

This particular example reads and writes a 25 TB weather dataset stored in
Google Cloud Storage. The dataset consists of 19 variables, each of which
is logically a 3D array of shape (350640, 721, 1440), stored in blocks of
shape (31, 721, 1440) via Zarr .
Now I want to "rechunk" them into blocks of shape (350640, 5, 5), which is
more convenient for queries like "Return the past 40 years of weather for
this particular location". To be clear, this particular use-case is
synthetic, but it reflects a common pattern for large-scale processing of
weather and climate datasets.

I originally wrote my pipeline to process all 19 variables at once, but it
looks like it would be more efficient to process them separately. So now I
want to essentially re-run my original pipeline 19 times in parallel.

For this particular codebase, I think the right call probably *is* to
rewrite all my underlying transforms to handle an expanded key, including
the variable name. This will pay other dividends. But if I didn't want to
do that refactor, I would need to duplicate or Partition the PCollection
into 19 parts, which seems like a lot. My xarray_beam.Rechunk() transform
includes a few GroupByKey transforms inside and definitely cannot operate
in-memory.



On Mon, May 24, 2021 at 4:12 PM Reuven Lax  wrote:

> Can you explain a bit more? Where are these data sets coming from?
>
> On Mon, May 24, 2021 at 3:55 PM Stephan Hoyer  wrote:
>
>> I'm not concerned with key-dependent topologies, which I didn't even
>> think was possible to express in Beam.
>>
>> It's more that I already wrote a PTransform for processing a *single* 1
>> TB dataset. Now I want to write a single PTransform that effectively runs
>> the original PTransform in groups over ~20 such datasets (ideally without
>> needing to know that number 20 ahead of time).
>>
>> On Mon, May 24, 2021 at 3:30 PM Reuven Lax  wrote:
>>
>>> Is the issue that you have a different topology depending on the key?
>>>
>>> On Mon, May 24, 2021 at 2:49 PM Stephan Hoyer  wrote:
>>>
 Exactly, my use-case has another nested GroupByKey to apply per key.
 But even if it could be done in a streaming fashion, it's way too much data
 (1 TB) to process on a single worker in a reasonable amount of time.

 On Mon, May 24, 2021 at 2:46 PM Kenneth Knowles 
 wrote:

> I was thinking there was some non-trivial topology (such as further
> GBKs) within the logic to be applied to each key group.
>
> Kenn
>
> On Mon, May 24, 2021 at 2:38 PM Brian Hulette 
> wrote:
>
>> Isn't it possible to read the grouped values produced by a GBK from
>> an Iterable and yield results as you go, without needing to collect all 
>> of
>> each input into memory? Perhaps I'm misunderstanding your use-case.
>>
>> Brian
>>
>> On Mon, May 24, 2021 at 10:41 AM Kenneth Knowles 
>> wrote:
>>
>>> I'm just pinging this thread because I think it is an interesting
>>> problem and don't want it to slip by.
>>>
>>> I bet a lot of users have gone through the tedious conversion you
>>> describe. Of course, it may often not be possible if you are using a
>>> library transform. There are a number of aspects of the Beam model that 
>>> are
>>> designed a specific way explicitly *because* we need to assume that a 
>>> large
>>> number of composites in your pipeline are not modifiable by you. Most
>>> closely related: this is why windowing is something carried along
>>> implicitly rather than just a parameter to GBK - that would require all
>>> transforms to expose how they use GBK under the hood and they would all
>>> have to plumb this extra key/WindowFn through every API. Instead, we 
>>> have
>>> this way to implicitly add a second key to any transform :-)
>>>
>>> So in addition to being tedious for you, it would be good to have a
>>> better solution.
>>>
>>> Kenn
>>>
>>> On Fri, May 21, 2021 at 7:18 PM Stephan Hoyer 
>>> wrote:
>>>
 I'd like to write a Beam PTransform that applies an *existing* Beam
 transform to each set of grouped values, separately, and combines the
 result. Is anything like this possible with Beam using the Python SDK?

 Here are the closest things I've come up with:
 1. If each set of *inputs* to my transform fit into memory, I
 could use GroupByKey followed by FlatMap.
 2. If each set of *outputs* from my transform fit into memory, I
 could use CombinePerKey.
 3. If I knew the static number of groups ahead of time, I coul