Update for anyone who might run into this:

I was able to get this fixed by using  beam.io.ReadAllFromAvro([input]) instead
of beam.io.ReadFromAvro(input).
After some digging I realized that `ReadFromAvro` relies on splittable
dofns to split reads and splittable do fns are not supported in SparkRunner.
Switching to ReadAllFromAvro worked as it relies on
filebasedsource.ReadAllFiless which uses a different approach to splitting
the work.


On Fri, Jun 11, 2021 at 8:18 AM Ajo Thomas <ajo.thoma...@gmail.com> wrote:

> Hi folks,
> I am working on running a Portable Python pipeline on Spark.
> The test pipeline is very straightforward where I am trying to read some
> avro data in hdfs using avroio (native io and not an external transform)
> and write it back to hdfs. Here is the pipeline:
> Pipeline:
> pipeline_options = get_pipeline_options()
> input = "hdfs://inputpath/*"
> schema = get_schema(input)
> output = "hdfs://outputpath/out"
> """Pipeline"""
> p = Pipeline(options=pipeline_options)
> (p
> | 'read' >> beam.io.ReadFromAvro(input)
> | 'write' >> beam.io.WriteToAvro(file_path_prefix=output, schema=schema,
> file_name_suffix=".avro"))
> p.run().wait_until_finish()
> Here is the dot rendered representation of the fused pipeline from
> SparkPortableRunner:
> {
> rankdir=LR
> 0 [label="read/Read/Impulse\nbeam:transform:impulse:v1"]
> 1 [label="17read/Read/Impulse.None/beam:env:process:v1:0\n
> beam:runner:executable_stage:v1"]
> 0 -> 1 [style=solid label="1"]
> 2 [label="40read/Read/Map(<lambda at
> iobase.py:899>).None/SplitAndSize0/beam:env:process:v1:0\n
> beam:runner:executable_stage:v1"]
> 1 -> 2 [style=solid label="2/SplitAndSize0"]
> }
> I have assigned about 10 executors for the pipeline run. I also have added
> some logs in the io to see how the io is splitting the bundles. I can see
> that all the reads are happening through a single executor while other
> executors are idle. Any pointers to resolve this would be appreciated.
> Thanks
> Ajo

