I was thinking there was some non-trivial topology (such as further GBKs) within the logic to be applied to each key group.
Kenn On Mon, May 24, 2021 at 2:38 PM Brian Hulette <[email protected]> wrote: > Isn't it possible to read the grouped values produced by a GBK from an > Iterable and yield results as you go, without needing to collect all of > each input into memory? Perhaps I'm misunderstanding your use-case. > > Brian > > On Mon, May 24, 2021 at 10:41 AM Kenneth Knowles <[email protected]> wrote: > >> I'm just pinging this thread because I think it is an interesting problem >> and don't want it to slip by. >> >> I bet a lot of users have gone through the tedious conversion you >> describe. Of course, it may often not be possible if you are using a >> library transform. There are a number of aspects of the Beam model that are >> designed a specific way explicitly *because* we need to assume that a large >> number of composites in your pipeline are not modifiable by you. Most >> closely related: this is why windowing is something carried along >> implicitly rather than just a parameter to GBK - that would require all >> transforms to expose how they use GBK under the hood and they would all >> have to plumb this extra key/WindowFn through every API. Instead, we have >> this way to implicitly add a second key to any transform :-) >> >> So in addition to being tedious for you, it would be good to have a >> better solution. >> >> Kenn >> >> On Fri, May 21, 2021 at 7:18 PM Stephan Hoyer <[email protected]> wrote: >> >>> I'd like to write a Beam PTransform that applies an *existing* Beam >>> transform to each set of grouped values, separately, and combines the >>> result. Is anything like this possible with Beam using the Python SDK? >>> >>> Here are the closest things I've come up with: >>> 1. If each set of *inputs* to my transform fit into memory, I could use >>> GroupByKey followed by FlatMap. >>> 2. If each set of *outputs* from my transform fit into memory, I could >>> use CombinePerKey. >>> 3. If I knew the static number of groups ahead of time, I could use >>> Partition, followed by applying my transform multiple times, followed by >>> Flatten. >>> >>> In my scenario, none of these holds true. For example, currently I have >>> ~20 groups of values, with each group holding ~1 TB of data. My custom >>> transform simply shuffles this TB of data around, so each set of outputs is >>> also 1TB in size. >>> >>> In my particular case, it seems my options are to either relax these >>> constraints, or to manually convert each step of my existing transform to >>> apply per key. This conversion process is tedious, but very >>> straightforward, e.g., the GroupByKey and ParDo that my transform is built >>> out of just need to deal with an expanded key. >>> >>> I wonder, could this be something built into Beam itself, e.g,. as >>> TransformPerKey? The ptranforms that result from combining other Beam >>> transforms (e.g., _ChainPTransform in Python) are private, so this seems >>> like something that would need to exist in Beam itself, if it could exist >>> at all. >>> >>> Cheers, >>> Stephan >>> >>
