[ 
https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated BEAM-9434:
-------------------------------
    Summary: Improve Spark runner reshuffle translation to maximize parallelism 
 (was: Performance improvements processing a large number of Avro files in 
S3+Spark)

> Improve Spark runner reshuffle translation to maximize parallelism
> ------------------------------------------------------------------
>
>                 Key: BEAM-9434
>                 URL: https://issues.apache.org/jira/browse/BEAM-9434
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark
>    Affects Versions: 2.19.0
>            Reporter: Emiliano Capoccia
>            Assignee: Emiliano Capoccia
>            Priority: Minor
>             Fix For: 2.21.0
>
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> There is a performance issue when processing a large number of small Avro 
> files in Spark on k8s (tens of thousands or more).
> The recommended way of reading a pattern of Avro files in Beam is by means of:
>  
> {code:java}
> PCollection<AvroGenClass> records = p.apply(AvroIO.read(AvroGenClass.class)
> .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles())
> {code}
> However, in the case of many small files, the above results in the entire 
> reading taking place in a single task/node, which is considerably slow and 
> has scalability issues.
> The option of omitting the hint is not viable, as it results in too many 
> tasks being spawn, and the cluster being busy doing coordination of tiny 
> tasks with high overhead.
> There are a few workarounds on the internet which mainly revolve around 
> compacting the input files before processing, so that a reduced number of 
> bulky files is processed in parallel.
> It seems the Spark runner is using the parallelism of the input distributed 
> collection (RDD) to calculate the number of partitions in Reshuffle. In the 
> case of FileIO/AvroIO if the input pattern is a regex the size of the input 
> is 1 which would be far from an optimal parallelism value. We may fix this by 
> improving the translation of reshuffle.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to