Happy to give a concrete example, I even have open source code I can share
in this case :)
https://github.com/google/xarray-beam/tree/9728970aa18abddafec22a23cad92b5d4a1e11e5/examples
https://github.com/google/xarray-beam/blob/9728970aa18abddafec22a23cad92b5d4a1e11e5/examples/era5_rechunk.py

This particular example reads and writes a 25 TB weather dataset stored in
Google Cloud Storage. The dataset consists of 19 variables, each of which
is logically a 3D array of shape (350640, 721, 1440), stored in blocks of
shape (31, 721, 1440) via Zarr <https://zarr.readthedocs.io/en/stable/>.
Now I want to "rechunk" them into blocks of shape (350640, 5, 5), which is
more convenient for queries like "Return the past 40 years of weather for
this particular location". To be clear, this particular use-case is
synthetic, but it reflects a common pattern for large-scale processing of
weather and climate datasets.

I originally wrote my pipeline to process all 19 variables at once, but it
looks like it would be more efficient to process them separately. So now I
want to essentially re-run my original pipeline 19 times in parallel.

For this particular codebase, I think the right call probably *is* to
rewrite all my underlying transforms to handle an expanded key, including
the variable name. This will pay other dividends. But if I didn't want to
do that refactor, I would need to duplicate or Partition the PCollection
into 19 parts, which seems like a lot. My xarray_beam.Rechunk() transform
includes a few GroupByKey transforms inside and definitely cannot operate
in-memory.



On Mon, May 24, 2021 at 4:12 PM Reuven Lax <re...@google.com> wrote:

> Can you explain a bit more? Where are these data sets coming from?
>
> On Mon, May 24, 2021 at 3:55 PM Stephan Hoyer <sho...@google.com> wrote:
>
>> I'm not concerned with key-dependent topologies, which I didn't even
>> think was possible to express in Beam.
>>
>> It's more that I already wrote a PTransform for processing a *single* 1
>> TB dataset. Now I want to write a single PTransform that effectively runs
>> the original PTransform in groups over ~20 such datasets (ideally without
>> needing to know that number 20 ahead of time).
>>
>> On Mon, May 24, 2021 at 3:30 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Is the issue that you have a different topology depending on the key?
>>>
>>> On Mon, May 24, 2021 at 2:49 PM Stephan Hoyer <sho...@google.com> wrote:
>>>
>>>> Exactly, my use-case has another nested GroupByKey to apply per key.
>>>> But even if it could be done in a streaming fashion, it's way too much data
>>>> (1 TB) to process on a single worker in a reasonable amount of time.
>>>>
>>>> On Mon, May 24, 2021 at 2:46 PM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> I was thinking there was some non-trivial topology (such as further
>>>>> GBKs) within the logic to be applied to each key group.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Mon, May 24, 2021 at 2:38 PM Brian Hulette <bhule...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Isn't it possible to read the grouped values produced by a GBK from
>>>>>> an Iterable and yield results as you go, without needing to collect all 
>>>>>> of
>>>>>> each input into memory? Perhaps I'm misunderstanding your use-case.
>>>>>>
>>>>>> Brian
>>>>>>
>>>>>> On Mon, May 24, 2021 at 10:41 AM Kenneth Knowles <k...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm just pinging this thread because I think it is an interesting
>>>>>>> problem and don't want it to slip by.
>>>>>>>
>>>>>>> I bet a lot of users have gone through the tedious conversion you
>>>>>>> describe. Of course, it may often not be possible if you are using a
>>>>>>> library transform. There are a number of aspects of the Beam model that 
>>>>>>> are
>>>>>>> designed a specific way explicitly *because* we need to assume that a 
>>>>>>> large
>>>>>>> number of composites in your pipeline are not modifiable by you. Most
>>>>>>> closely related: this is why windowing is something carried along
>>>>>>> implicitly rather than just a parameter to GBK - that would require all
>>>>>>> transforms to expose how they use GBK under the hood and they would all
>>>>>>> have to plumb this extra key/WindowFn through every API. Instead, we 
>>>>>>> have
>>>>>>> this way to implicitly add a second key to any transform :-)
>>>>>>>
>>>>>>> So in addition to being tedious for you, it would be good to have a
>>>>>>> better solution.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Fri, May 21, 2021 at 7:18 PM Stephan Hoyer <sho...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'd like to write a Beam PTransform that applies an *existing* Beam
>>>>>>>> transform to each set of grouped values, separately, and combines the
>>>>>>>> result. Is anything like this possible with Beam using the Python SDK?
>>>>>>>>
>>>>>>>> Here are the closest things I've come up with:
>>>>>>>> 1. If each set of *inputs* to my transform fit into memory, I
>>>>>>>> could use GroupByKey followed by FlatMap.
>>>>>>>> 2. If each set of *outputs* from my transform fit into memory, I
>>>>>>>> could use CombinePerKey.
>>>>>>>> 3. If I knew the static number of groups ahead of time, I could use
>>>>>>>> Partition, followed by applying my transform multiple times, followed 
>>>>>>>> by
>>>>>>>> Flatten.
>>>>>>>>
>>>>>>>> In my scenario, none of these holds true. For example, currently I
>>>>>>>> have ~20 groups of values, with each group holding ~1 TB of data. My 
>>>>>>>> custom
>>>>>>>> transform simply shuffles this TB of data around, so each set of 
>>>>>>>> outputs is
>>>>>>>> also 1TB in size.
>>>>>>>>
>>>>>>>> In my particular case, it seems my options are to either relax
>>>>>>>> these constraints, or to manually convert each step of my existing
>>>>>>>> transform to apply per key. This conversion process is tedious, but 
>>>>>>>> very
>>>>>>>> straightforward, e.g., the GroupByKey and ParDo that my transform is 
>>>>>>>> built
>>>>>>>> out of just need to deal with an expanded key.
>>>>>>>>
>>>>>>>> I wonder, could this be something built into Beam itself, e.g,. as
>>>>>>>> TransformPerKey? The ptranforms that result from combining other Beam
>>>>>>>> transforms (e.g., _ChainPTransform in Python) are private, so this 
>>>>>>>> seems
>>>>>>>> like something that would need to exist in Beam itself, if it could 
>>>>>>>> exist
>>>>>>>> at all.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>

Reply via email to