Thank you!

On Sep 27, 2023, at 11:05 AM, Byron Ellis via user 
<user@beam.apache.org<mailto:user@beam.apache.org>> wrote:

Hi Logan,

I suspect your issue is that you're using "Create" to hold all of your data. 
That particular transform's data is typically stored within the pipeline 
definition itself, which is probably why you're seeing the protobuf max errors. 
It's really meant for things more like command line parameters that you might 
want to send into a transform or for testing. Making your data available in 
such a way that it can be read by one of the IO transforms (say FileIO) will 
let you scale up.

Best,
B

On Tue, Sep 26, 2023 at 9:54 PM Logan G Engstrom 
<engst...@mit.edu<mailto:engst...@mit.edu>> wrote:
Hi,


I am new to learning Apache Beam and have had trouble with performance; despite 
searching around I have not been able to identify a solution. My issue is that 
even though I am using parallelizable operations + converting to PCollections, 
Beam is incredibly slow. Furthermore, when running on larger amounts of data 
(i.e., more than a few hundred thousand rows) I get issues around data size 
(i.e., exceeds maximum protobuf size of 2GB errors for operations).

I am just trying to reproduce code from another research group (copied from an 
online reproduction of the C4 
dataset<https://github.com/tensorflow/datasets/blame/master/tensorflow_datasets/text/c4_utils.py>),
 so while the individual operations are likely ok, I am worried about a simple 
setup issue that I can't figure out.

I think the issue might be with starting the pipeline, or with how I am passing 
data to the pipeline. Help would be greatly appreciated! Here is a skeleton of 
my code

```

  pipeline = beam.Pipeline()
  line_to_selected_url = (
      pipeline
      | beam.Create(pages) # pages is entirely Python objects: a list of tuples 
(string, dict)
      | beam.FlatMap(_emit_url_to_lines) # this is copied so it should be ok
      | beam.combiners.Top.PerKey(1, key=_hash_text, reverse=True)) #  # this 
is copied so it should be ok

  lines_to_keep = line_to_selected_url | beam.Map(lambda x: (x[1][0], x[0]))

  final_docs = ({ "features": pages, "lines": lines_to_keep }
                | "group_features_and_lines_by_url" >> beam.CoGroupByKey()
                | beam.FlatMap(
                    _remove_lines_from_text,
                    min_num_sentences=min_num_sentences)
                | beam.combiners.ToList())
  res = final_docs | ("Write to JSON" >> beam.Map(hacky_save))
  result = res.pipeline.run()
  result.wait_until_finish()

```

I think the issue could also lie with how I run the pipeline. To run the code I 
am passing the parameters: `--autoscalingAlgorithm=BASIC --maxWorkers=32` to 
the script.

Thank you so much for reading this far!

- L

Reply via email to