Hi Logan,

I suspect your issue is that you're using "Create" to hold all of your
data. That particular transform's data is typically stored within the
pipeline definition itself, which is probably why you're seeing the
protobuf max errors. It's really meant for things more like command line
parameters that you might want to send into a transform or for testing.
Making your data available in such a way that it can be read by one of the
IO transforms (say FileIO) will let you scale up.

Best,
B

On Tue, Sep 26, 2023 at 9:54 PM Logan G Engstrom <engst...@mit.edu> wrote:

> Hi,
>
> I am new to learning Apache Beam and have had trouble with performance;
> despite searching around I have not been able to identify a solution. My
> issue is that even though I am using parallelizable operations + converting
> to PCollections, Beam is incredibly slow. Furthermore, when running on
> larger amounts of data (i.e., more than a few hundred thousand rows) I get
> issues around data size (i.e., exceeds maximum protobuf size of 2GB errors
> for operations).
>
> I am just trying to reproduce code from another research group (copied
> from an online reproduction of the C4 dataset
> <https://github.com/tensorflow/datasets/blame/master/tensorflow_datasets/text/c4_utils.py>),
> so while the individual operations are likely ok, I am worried about a
> simple setup issue that I can't figure out.
>
> I think the issue might be with starting the pipeline, or with how I am
> passing data to the pipeline. Help would be greatly appreciated! Here is a
> skeleton of my code
> ```
>
>   pipeline = beam.Pipeline()
>   line_to_selected_url = (
>       pipeline
>       | beam.Create(pages) # pages is entirely Python objects: a list of 
> tuples (string, dict)
>       | beam.FlatMap(_emit_url_to_lines) # this is copied so it should be ok
>       | beam.combiners.Top.PerKey(1, key=_hash_text, reverse=True)) #  # this 
> is copied so it should be ok
>
>   lines_to_keep = line_to_selected_url | beam.Map(lambda x: (x[1][0], x[0]))
>
>   final_docs = ({ "features": pages, "lines": lines_to_keep }
>                 | "group_features_and_lines_by_url" >> beam.CoGroupByKey()
>                 | beam.FlatMap(
>                     _remove_lines_from_text,
>                     min_num_sentences=min_num_sentences)
>                 | beam.combiners.ToList())
>   res = final_docs | ("Write to JSON" >> beam.Map(hacky_save))
>   result = res.pipeline.run()
>   result.wait_until_finish()
>
> ```
>
> I think the issue could also lie with how I run the pipeline. To run the code 
> I am passing the parameters: `--autoscalingAlgorithm=BASIC --maxWorkers=32` 
> to the script.
>
> Thank you so much for reading this far!
>
> - L
>
>

Reply via email to