Update for anyone who might run into this: I was able to get this fixed by using beam.io.ReadAllFromAvro([input]) instead of beam.io.ReadFromAvro(input). After some digging I realized that `ReadFromAvro` relies on splittable dofns to split reads and splittable do fns are not supported in SparkRunner. Switching to ReadAllFromAvro worked as it relies on filebasedsource.ReadAllFiless which uses a different approach to splitting the work.
Thanks Ajo On Fri, Jun 11, 2021 at 8:18 AM Ajo Thomas <ajo.thoma...@gmail.com> wrote: > Hi folks, > > I am working on running a Portable Python pipeline on Spark. > The test pipeline is very straightforward where I am trying to read some > avro data in hdfs using avroio (native io and not an external transform) > and write it back to hdfs. Here is the pipeline: > > Pipeline: > pipeline_options = get_pipeline_options() > input = "hdfs://inputpath/*" > schema = get_schema(input) > output = "hdfs://outputpath/out" > """Pipeline""" > p = Pipeline(options=pipeline_options) > (p > | 'read' >> beam.io.ReadFromAvro(input) > | 'write' >> beam.io.WriteToAvro(file_path_prefix=output, schema=schema, > file_name_suffix=".avro")) > p.run().wait_until_finish() > > > Here is the dot rendered representation of the fused pipeline from > SparkPortableRunner: > { > rankdir=LR > 0 [label="read/Read/Impulse\nbeam:transform:impulse:v1"] > 1 [label="17read/Read/Impulse.None/beam:env:process:v1:0\n > beam:runner:executable_stage:v1"] > 0 -> 1 [style=solid label="1"] > 2 [label="40read/Read/Map(<lambda at > iobase.py:899>).None/SplitAndSize0/beam:env:process:v1:0\n > beam:runner:executable_stage:v1"] > 1 -> 2 [style=solid label="2/SplitAndSize0"] > } > > I have assigned about 10 executors for the pipeline run. I also have added > some logs in the io to see how the io is splitting the bundles. I can see > that all the reads are happening through a single executor while other > executors are idle. Any pointers to resolve this would be appreciated. > > Thanks > Ajo > > > > >