[ 
https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17050688#comment-17050688
 ] 

Emiliano Capoccia edited comment on BEAM-9434 at 3/5/20, 9:41 PM:
------------------------------------------------------------------

In the case outlined of a large number of very small (kb) avro files, the idea 
is to expose a new hint in the AvroIO class that can handle the reading of the 
input files with a pre determined number of parallel tasks.

Both extremes of having a very high or a very low number of tasks should be 
avoided, as they are suboptimal in terms of performance: too many tasks yield 
to very high overhead whereas too few (or a single one) result in an 
unacceptable serialisation on few nodes, with the cluster being under utilised. 

In my tests I read 6578 Avro files from S3, each containing a single record.

The performance of the reading the files using the proposed pull request #11037 
improved from 16 minutes to 2.3 minutes with 10 partitions.

Even more importantly, the memory used by every node is 1/10th roughly of the 
case with a single node.

*Reference run*, 6578 files, 1 task/executor, shuffle read 164kb, 6578 records, 
shuffle write 58Mb, 16 minutes execution time.

*PR #11037*, 10 tasks/executors, 660 files per task average, totalling 6578; 
23kb average shuffle read per task, 6 Mb average shuffle write per task, 2.3 
minutes execution time per executor in parallel.


was (Author: ecapoccia):
In the case outlined of a large number of very small (kb) avro files, the idea 
is to expose a new hint in the AvroIO class that can handle the reading of the 
input files with a pre determined number of parallel tasks.

Both extremes of having a very high or a very low number of tasks should be 
avoided, as they are suboptimal in terms of performance: too many tasks yield 
to very high overhead whereas a too few tasks (or a single one) result in an 
unacceptable serialisation of reading on too little node, with the cluster 
being under utilised. 

In the tests that I carried out, I was reading 6578 Avro files from S3, each 
containing a single record.

The performance of the reading using the proposed pull request #11037 improved 
using 10 partitions, from 16 minutes to 2.3 minutes for performing the same 
exact work.

Even more importantly, the memory used by every node is 1/10th roughly of the 
case with a single node.

*Reference run*, 6578 files, 1 task/executor, shuffle read 164kb, 6578 records, 
shuffle write 58Mb, 16 minutes execution time.

*PR #11037*, 10 tasks/executors, 660 files per task average, totalling 6578; 
23kb average shuffle read per task, 6 Mb average shuffle write per task, 2.3 
minutes execution time per executor in parallel.

> Performance improvements processing a large number of Avro files in S3+Spark
> ----------------------------------------------------------------------------
>
>                 Key: BEAM-9434
>                 URL: https://issues.apache.org/jira/browse/BEAM-9434
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-aws, sdk-java-core
>    Affects Versions: 2.19.0
>            Reporter: Emiliano Capoccia
>            Assignee: Emiliano Capoccia
>            Priority: Minor
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> There is a performance issue when processing a large number of small Avro 
> files in Spark on K8S (tens of thousands or more).
> The recommended way of reading a pattern of Avro files in Beam is by means of:
>  
> {code:java}
> PCollection<AvroGenClass> records = p.apply(AvroIO.read(AvroGenClass.class)
> .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles())
> {code}
> However, in the case of many small files, the above results in the entire 
> reading taking place in a single task/node, which is considerably slow and 
> has scalability issues.
> The option of omitting the hint is not viable, as it results in too many 
> tasks being spawn, and the cluster being busy doing coordination of tiny 
> tasks with high overhead.
> There are a few workarounds on the internet which mainly revolve around 
> compacting the input files before processing, so that a reduced number of 
> bulky files is processed in parallel.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to