Thank you! On Sep 27, 2023, at 11:05 AM, Byron Ellis via user <user@beam.apache.org<mailto:user@beam.apache.org>> wrote:
Hi Logan, I suspect your issue is that you're using "Create" to hold all of your data. That particular transform's data is typically stored within the pipeline definition itself, which is probably why you're seeing the protobuf max errors. It's really meant for things more like command line parameters that you might want to send into a transform or for testing. Making your data available in such a way that it can be read by one of the IO transforms (say FileIO) will let you scale up. Best, B On Tue, Sep 26, 2023 at 9:54 PM Logan G Engstrom <engst...@mit.edu<mailto:engst...@mit.edu>> wrote: Hi, I am new to learning Apache Beam and have had trouble with performance; despite searching around I have not been able to identify a solution. My issue is that even though I am using parallelizable operations + converting to PCollections, Beam is incredibly slow. Furthermore, when running on larger amounts of data (i.e., more than a few hundred thousand rows) I get issues around data size (i.e., exceeds maximum protobuf size of 2GB errors for operations). I am just trying to reproduce code from another research group (copied from an online reproduction of the C4 dataset<https://github.com/tensorflow/datasets/blame/master/tensorflow_datasets/text/c4_utils.py>), so while the individual operations are likely ok, I am worried about a simple setup issue that I can't figure out. I think the issue might be with starting the pipeline, or with how I am passing data to the pipeline. Help would be greatly appreciated! Here is a skeleton of my code ``` pipeline = beam.Pipeline() line_to_selected_url = ( pipeline | beam.Create(pages) # pages is entirely Python objects: a list of tuples (string, dict) | beam.FlatMap(_emit_url_to_lines) # this is copied so it should be ok | beam.combiners.Top.PerKey(1, key=_hash_text, reverse=True)) # # this is copied so it should be ok lines_to_keep = line_to_selected_url | beam.Map(lambda x: (x[1][0], x[0])) final_docs = ({ "features": pages, "lines": lines_to_keep } | "group_features_and_lines_by_url" >> beam.CoGroupByKey() | beam.FlatMap( _remove_lines_from_text, min_num_sentences=min_num_sentences) | beam.combiners.ToList()) res = final_docs | ("Write to JSON" >> beam.Map(hacky_save)) result = res.pipeline.run() result.wait_until_finish() ``` I think the issue could also lie with how I run the pipeline. To run the code I am passing the parameters: `--autoscalingAlgorithm=BASIC --maxWorkers=32` to the script. Thank you so much for reading this far! - L