Hi folks,

I am working on running a Portable Python pipeline on Spark.
The test pipeline is very straightforward where I am trying to read some
avro data in hdfs using avroio (native io and not an external transform)
and write it back to hdfs. Here is the pipeline:

Pipeline:
pipeline_options = get_pipeline_options()
input = "hdfs://inputpath/*"
schema = get_schema(input)
output = "hdfs://outputpath/out"
"""Pipeline"""
p = Pipeline(options=pipeline_options)
(p
| 'read' >> beam.io.ReadFromAvro(input)
| 'write' >> beam.io.WriteToAvro(file_path_prefix=output, schema=schema,
file_name_suffix=".avro"))
p.run().wait_until_finish()


Here is the dot rendered representation of the fused pipeline from
SparkPortableRunner:
{
rankdir=LR
0 [label="read/Read/Impulse\nbeam:transform:impulse:v1"]
1 [label="17read/Read/Impulse.None/beam:env:process:v1:0\n
beam:runner:executable_stage:v1"]
0 -> 1 [style=solid label="1"]
2 [label="40read/Read/Map(<lambda at
iobase.py:899>).None/SplitAndSize0/beam:env:process:v1:0\n
beam:runner:executable_stage:v1"]
1 -> 2 [style=solid label="2/SplitAndSize0"]
}

I have assigned about 10 executors for the pipeline run. I also have added
some logs in the io to see how the io is splitting the bundles. I can see
that all the reads are happening through a single executor while other
executors are idle. Any pointers to resolve this would be appreciated.

Thanks
Ajo

Reply via email to