Exactly, my use-case has another nested GroupByKey to apply per key. But
even if it could be done in a streaming fashion, it's way too much data (1
TB) to process on a single worker in a reasonable amount of time.

On Mon, May 24, 2021 at 2:46 PM Kenneth Knowles <k...@apache.org> wrote:

> I was thinking there was some non-trivial topology (such as further GBKs)
> within the logic to be applied to each key group.
>
> Kenn
>
> On Mon, May 24, 2021 at 2:38 PM Brian Hulette <bhule...@google.com> wrote:
>
>> Isn't it possible to read the grouped values produced by a GBK from an
>> Iterable and yield results as you go, without needing to collect all of
>> each input into memory? Perhaps I'm misunderstanding your use-case.
>>
>> Brian
>>
>> On Mon, May 24, 2021 at 10:41 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> I'm just pinging this thread because I think it is an interesting
>>> problem and don't want it to slip by.
>>>
>>> I bet a lot of users have gone through the tedious conversion you
>>> describe. Of course, it may often not be possible if you are using a
>>> library transform. There are a number of aspects of the Beam model that are
>>> designed a specific way explicitly *because* we need to assume that a large
>>> number of composites in your pipeline are not modifiable by you. Most
>>> closely related: this is why windowing is something carried along
>>> implicitly rather than just a parameter to GBK - that would require all
>>> transforms to expose how they use GBK under the hood and they would all
>>> have to plumb this extra key/WindowFn through every API. Instead, we have
>>> this way to implicitly add a second key to any transform :-)
>>>
>>> So in addition to being tedious for you, it would be good to have a
>>> better solution.
>>>
>>> Kenn
>>>
>>> On Fri, May 21, 2021 at 7:18 PM Stephan Hoyer <sho...@google.com> wrote:
>>>
>>>> I'd like to write a Beam PTransform that applies an *existing* Beam
>>>> transform to each set of grouped values, separately, and combines the
>>>> result. Is anything like this possible with Beam using the Python SDK?
>>>>
>>>> Here are the closest things I've come up with:
>>>> 1. If each set of *inputs* to my transform fit into memory, I could
>>>> use GroupByKey followed by FlatMap.
>>>> 2. If each set of *outputs* from my transform fit into memory, I could
>>>> use CombinePerKey.
>>>> 3. If I knew the static number of groups ahead of time, I could use
>>>> Partition, followed by applying my transform multiple times, followed by
>>>> Flatten.
>>>>
>>>> In my scenario, none of these holds true. For example, currently I have
>>>> ~20 groups of values, with each group holding ~1 TB of data. My custom
>>>> transform simply shuffles this TB of data around, so each set of outputs is
>>>> also 1TB in size.
>>>>
>>>> In my particular case, it seems my options are to either relax these
>>>> constraints, or to manually convert each step of my existing transform to
>>>> apply per key. This conversion process is tedious, but very
>>>> straightforward, e.g., the GroupByKey and ParDo that my transform is built
>>>> out of just need to deal with an expanded key.
>>>>
>>>> I wonder, could this be something built into Beam itself, e.g,. as
>>>> TransformPerKey? The ptranforms that result from combining other Beam
>>>> transforms (e.g., _ChainPTransform in Python) are private, so this seems
>>>> like something that would need to exist in Beam itself, if it could exist
>>>> at all.
>>>>
>>>> Cheers,
>>>> Stephan
>>>>
>>>

Reply via email to