[ https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17050688#comment-17050688 ]
Emiliano Capoccia edited comment on BEAM-9434 at 3/5/20, 9:41 PM: ------------------------------------------------------------------ In the case outlined of a large number of very small (kb) avro files, the idea is to expose a new hint in the AvroIO class that can handle the reading of the input files with a pre determined number of parallel tasks. Both extremes of having a very high or a very low number of tasks should be avoided, as they are suboptimal in terms of performance: too many tasks yield to very high overhead whereas too few (or a single one) result in an unacceptable serialisation on few nodes, with the cluster being under utilised. In my tests I read 6578 Avro files from S3, each containing a single record. The performance of the reading the files using the proposed pull request #11037 improved from 16 minutes to 2.3 minutes with 10 partitions. Even more importantly, the memory used by every node is 1/10th roughly of the case with a single node. *Reference run*, 6578 files, 1 task/executor, shuffle read 164kb, 6578 records, shuffle write 58Mb, 16 minutes execution time. *PR #11037*, 10 tasks/executors, 660 files per task average, totalling 6578; 23kb average shuffle read per task, 6 Mb average shuffle write per task, 2.3 minutes execution time per executor in parallel. was (Author: ecapoccia): In the case outlined of a large number of very small (kb) avro files, the idea is to expose a new hint in the AvroIO class that can handle the reading of the input files with a pre determined number of parallel tasks. Both extremes of having a very high or a very low number of tasks should be avoided, as they are suboptimal in terms of performance: too many tasks yield to very high overhead whereas a too few tasks (or a single one) result in an unacceptable serialisation of reading on too little node, with the cluster being under utilised. In the tests that I carried out, I was reading 6578 Avro files from S3, each containing a single record. The performance of the reading using the proposed pull request #11037 improved using 10 partitions, from 16 minutes to 2.3 minutes for performing the same exact work. Even more importantly, the memory used by every node is 1/10th roughly of the case with a single node. *Reference run*, 6578 files, 1 task/executor, shuffle read 164kb, 6578 records, shuffle write 58Mb, 16 minutes execution time. *PR #11037*, 10 tasks/executors, 660 files per task average, totalling 6578; 23kb average shuffle read per task, 6 Mb average shuffle write per task, 2.3 minutes execution time per executor in parallel. > Performance improvements processing a large number of Avro files in S3+Spark > ---------------------------------------------------------------------------- > > Key: BEAM-9434 > URL: https://issues.apache.org/jira/browse/BEAM-9434 > Project: Beam > Issue Type: Improvement > Components: io-java-aws, sdk-java-core > Affects Versions: 2.19.0 > Reporter: Emiliano Capoccia > Assignee: Emiliano Capoccia > Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > There is a performance issue when processing a large number of small Avro > files in Spark on K8S (tens of thousands or more). > The recommended way of reading a pattern of Avro files in Beam is by means of: > > {code:java} > PCollection<AvroGenClass> records = p.apply(AvroIO.read(AvroGenClass.class) > .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles()) > {code} > However, in the case of many small files, the above results in the entire > reading taking place in a single task/node, which is considerably slow and > has scalability issues. > The option of omitting the hint is not viable, as it results in too many > tasks being spawn, and the cluster being busy doing coordination of tiny > tasks with high overhead. > There are a few workarounds on the internet which mainly revolve around > compacting the input files before processing, so that a reduced number of > bulky files is processed in parallel. > -- This message was sent by Atlassian Jira (v8.3.4#803005)