Beam team,

I’m currently making use of the Beam Go SDK to construct a pipeline for 
creating and inserting large text data files into GCS.

The pipeline is relatively simple; prepare a bracket of source data, transform 
it into fleshed out data sets, write to GCS with the textio package.

func main() {
   flag.Parse()
   ctx := context.Background()
   beam.Init()
   p := beam.NewPipeline()
   s := p.Root()
   // Get a PCollection of account numbers
   accNumCol := beam.CreateList(s, makeRange(*accountStart, *accountEnd))
   // Transform the PCollection of account numbers into a collection of strings 
that each represent a group of account data
   accountCol := beam.ParDo(s, accountNumToAccountWithRows, accNumCol)
   // Write all account blocks to a single file in GCS
   textio.Write(s, fmt.Sprintf("gs://files/CC%d-ACC%d.txt", *accountStart, 
*accountEnd), accountCol)
   if err := beamx.Run(context.Background(), p); err != nil {
      log.Exitf(ctx, "Failed to execute job: %v", err)
   }
}

Currently, for smaller files (e.g, 13MB or 30,000 lines of text) we are not 
experiencing any issues with the pipeline when executed in Dataflow. When 
preparing larger datasets that would generate medium sized files of approx. 
130MB, executing the write to GCS step in the Dataflow pipeline fails 
consistently with the following kind of errors:

Error message from worker: process bundle failed for instruction 
process_bundle-3 using plan process-bundle-descriptor-47 : while executing 
Process for Plan[process-bundle-descriptor-47]:
2: ParDo[textio.writeFileFn] Out:[]
1: DataSource[S[ptransform-46@localhost:12371], 0] 
Coder:W;coder-63<CoGBK;coder-64<int[varintz;c2];coder-65,string;coder-67[string]>>!GWC
 Out:2
         caused by:
source failed
         caused by:
rpc error: code = ResourceExhausted desc = grpc: received message larger than 
max (104858536 vs. 52428800)
&
  "jsonPayload": {
    "worker": "go-job-1-1599375664191334-09060001-zizk-harness-ptm6",
    "job": "2020-09-06_00_01_25-2750019264422346896",
    "work": "process_bundle-1",
    "message": "DataChannel.read localhost:12371 bad: rpc error: code = 
ResourceExhausted desc = grpc: received message larger than max (104938332 vs. 
52428800)",
    "logger": 
<project>/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/datamgr.go:261",
    "portability_worker_id": "1"
}

When executing the pipeline locally, without integration with GCS i.e. writing 
to example.txt instead of gs://example.txt no issues are apparent and files of 
arbitrary size can be generated. Running the pipeline locally with an 
integration to GCS also seems to work with files of any size, thought the 
upload can be quite slow. It is only when executing the pipeline on Dataflow 
that writing the files to GCS fail, it seems odd that we would encounter issues 
uploading files of relatively medium sizes to GCS, with such a simple pipeline. 
The errors seem to indicate that something is receiving a message size larger 
than 52MB (52428800), but I am unable to determine what, and where this magic 
number is coming from.

I would appreciate any insight into this issue with our pipeline’s integration 
with GCS, anyone seen this before?

Regards,
Patrick Mitchell
"This e-mail and any attachments to it (the "Communication") is, unless 
otherwise stated, confidential, may contain copyright material and is for the 
use only of the intended recipient. If you receive the Communication in error, 
please notify the sender immediately by return e-mail, delete the Communication 
and the return e-mail, and do not read, copy, retransmit or otherwise deal with 
it. Any views expressed in the Communication are those of the individual sender 
only, unless expressly stated to be those of Australia and New Zealand Banking 
Group Limited ABN 11 005 357 522, or any of its related entities including ANZ 
Bank New Zealand Limited (together "ANZ"). ANZ does not accept liability in 
connection with the integrity of or errors in the Communication, computer 
virus, data corruption, interference or delay arising from or in respect of the 
Communication."

Reply via email to