[ https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismaël Mejía updated BEAM-9434: ------------------------------- Summary: Improve Spark runner reshuffle translation to maximize parallelism (was: Performance improvements processing a large number of Avro files in S3+Spark) > Improve Spark runner reshuffle translation to maximize parallelism > ------------------------------------------------------------------ > > Key: BEAM-9434 > URL: https://issues.apache.org/jira/browse/BEAM-9434 > Project: Beam > Issue Type: Improvement > Components: runner-spark > Affects Versions: 2.19.0 > Reporter: Emiliano Capoccia > Assignee: Emiliano Capoccia > Priority: Minor > Fix For: 2.21.0 > > Time Spent: 3h 20m > Remaining Estimate: 0h > > There is a performance issue when processing a large number of small Avro > files in Spark on k8s (tens of thousands or more). > The recommended way of reading a pattern of Avro files in Beam is by means of: > > {code:java} > PCollection<AvroGenClass> records = p.apply(AvroIO.read(AvroGenClass.class) > .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles()) > {code} > However, in the case of many small files, the above results in the entire > reading taking place in a single task/node, which is considerably slow and > has scalability issues. > The option of omitting the hint is not viable, as it results in too many > tasks being spawn, and the cluster being busy doing coordination of tiny > tasks with high overhead. > There are a few workarounds on the internet which mainly revolve around > compacting the input files before processing, so that a reduced number of > bulky files is processed in parallel. > It seems the Spark runner is using the parallelism of the input distributed > collection (RDD) to calculate the number of partitions in Reshuffle. In the > case of FileIO/AvroIO if the input pattern is a regex the size of the input > is 1 which would be far from an optimal parallelism value. We may fix this by > improving the translation of reshuffle. > -- This message was sent by Atlassian Jira (v8.3.4#803005)