[jira] [Comment Edited] (BEAM-9434) Performance improvements processing a large number of Avro files in S3+Spark

2020-03-07 Thread Emiliano Capoccia (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17053885#comment-17053885
 ] 

Emiliano Capoccia edited comment on BEAM-9434 at 3/7/20, 9:30 AM:
--

Both.

Reading the metadata wouldn't be a problem, it also happens on every node in 
the proposed PR.
But the actual reading also happens on one node with unacceptably high reading 
times.
Maybe what you say applies possibly to the case of "bulky" files.
However, my solution particularly applies to the case where there is a high 
number of tiny files (I think I explained better in the Jira ticket).
In this latter case, the latency of reading each file from S3 dominates, but no 
chunking / shuffling happens with the standard Beam.

When I look at the DAG in Spark, I can see only one task there, and if I look 
at the executors they are all idle spare the one when all the reading happens. 
This is true for both the stage where you read the metadata, and the stage 
where you read the data.

With the proposed PR instead the number of tasks and parallel executors in the 
DAG is the one that you pass in the hint.


was (Author: ecapoccia):
Both.
Reading the metadata wouldn't be a problem, it also happens on every node in 
the proposed PR.
But the actual reading also happens on one node with unacceptably high reading 
times.
What you say applies to the case of "bulky" files. For those, the shuffling 
stage chunks the files and shuffle reading of each chunk.
However, my solution particularly applies to the case where there is a high 
number of tiny files (I think I explained better in the Jira ticket).
In this latter case, the latency of reading each file from S3 dominates, but no 
chunking / shuffling happens with the standard Beam.

> Performance improvements processing a large number of Avro files in S3+Spark
> 
>
> Key: BEAM-9434
> URL: https://issues.apache.org/jira/browse/BEAM-9434
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-aws, sdk-java-core
>Affects Versions: 2.19.0
>Reporter: Emiliano Capoccia
>Assignee: Emiliano Capoccia
>Priority: Minor
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> There is a performance issue when processing a large number of small Avro 
> files in Spark on K8S (tens of thousands or more).
> The recommended way of reading a pattern of Avro files in Beam is by means of:
>  
> {code:java}
> PCollection records = p.apply(AvroIO.read(AvroGenClass.class)
> .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles())
> {code}
> However, in the case of many small files, the above results in the entire 
> reading taking place in a single task/node, which is considerably slow and 
> has scalability issues.
> The option of omitting the hint is not viable, as it results in too many 
> tasks being spawn, and the cluster being busy doing coordination of tiny 
> tasks with high overhead.
> There are a few workarounds on the internet which mainly revolve around 
> compacting the input files before processing, so that a reduced number of 
> bulky files is processed in parallel.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9434) Performance improvements processing a large number of Avro files in S3+Spark

2020-03-06 Thread Emiliano Capoccia (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17053885#comment-17053885
 ] 

Emiliano Capoccia commented on BEAM-9434:
-

Both.
Reading the metadata wouldn't be a problem, it also happens on every node in 
the proposed PR.
But the actual reading also happens on one node with unacceptably high reading 
times.
What you say applies to the case of "bulky" files. For those, the shuffling 
stage chunks the files and shuffle reading of each chunk.
However, my solution particularly applies to the case where there is a high 
number of tiny files (I think I explained better in the Jira ticket).
In this latter case, the latency of reading each file from S3 dominates, but no 
chunking / shuffling happens with the standard Beam.

> Performance improvements processing a large number of Avro files in S3+Spark
> 
>
> Key: BEAM-9434
> URL: https://issues.apache.org/jira/browse/BEAM-9434
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-aws, sdk-java-core
>Affects Versions: 2.19.0
>Reporter: Emiliano Capoccia
>Assignee: Emiliano Capoccia
>Priority: Minor
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> There is a performance issue when processing a large number of small Avro 
> files in Spark on K8S (tens of thousands or more).
> The recommended way of reading a pattern of Avro files in Beam is by means of:
>  
> {code:java}
> PCollection records = p.apply(AvroIO.read(AvroGenClass.class)
> .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles())
> {code}
> However, in the case of many small files, the above results in the entire 
> reading taking place in a single task/node, which is considerably slow and 
> has scalability issues.
> The option of omitting the hint is not viable, as it results in too many 
> tasks being spawn, and the cluster being busy doing coordination of tiny 
> tasks with high overhead.
> There are a few workarounds on the internet which mainly revolve around 
> compacting the input files before processing, so that a reduced number of 
> bulky files is processed in parallel.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (BEAM-9434) Performance improvements processing a large number of Avro files in S3+Spark

2020-03-05 Thread Emiliano Capoccia (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17050688#comment-17050688
 ] 

Emiliano Capoccia edited comment on BEAM-9434 at 3/5/20, 9:41 PM:
--

In the case outlined of a large number of very small (kb) avro files, the idea 
is to expose a new hint in the AvroIO class that can handle the reading of the 
input files with a pre determined number of parallel tasks.

Both extremes of having a very high or a very low number of tasks should be 
avoided, as they are suboptimal in terms of performance: too many tasks yield 
to very high overhead whereas too few (or a single one) result in an 
unacceptable serialisation on few nodes, with the cluster being under utilised. 

In my tests I read 6578 Avro files from S3, each containing a single record.

The performance of the reading the files using the proposed pull request #11037 
improved from 16 minutes to 2.3 minutes with 10 partitions.

Even more importantly, the memory used by every node is 1/10th roughly of the 
case with a single node.

*Reference run*, 6578 files, 1 task/executor, shuffle read 164kb, 6578 records, 
shuffle write 58Mb, 16 minutes execution time.

*PR #11037*, 10 tasks/executors, 660 files per task average, totalling 6578; 
23kb average shuffle read per task, 6 Mb average shuffle write per task, 2.3 
minutes execution time per executor in parallel.


was (Author: ecapoccia):
In the case outlined of a large number of very small (kb) avro files, the idea 
is to expose a new hint in the AvroIO class that can handle the reading of the 
input files with a pre determined number of parallel tasks.

Both extremes of having a very high or a very low number of tasks should be 
avoided, as they are suboptimal in terms of performance: too many tasks yield 
to very high overhead whereas a too few tasks (or a single one) result in an 
unacceptable serialisation of reading on too little node, with the cluster 
being under utilised. 

In the tests that I carried out, I was reading 6578 Avro files from S3, each 
containing a single record.

The performance of the reading using the proposed pull request #11037 improved 
using 10 partitions, from 16 minutes to 2.3 minutes for performing the same 
exact work.

Even more importantly, the memory used by every node is 1/10th roughly of the 
case with a single node.

*Reference run*, 6578 files, 1 task/executor, shuffle read 164kb, 6578 records, 
shuffle write 58Mb, 16 minutes execution time.

*PR #11037*, 10 tasks/executors, 660 files per task average, totalling 6578; 
23kb average shuffle read per task, 6 Mb average shuffle write per task, 2.3 
minutes execution time per executor in parallel.

> Performance improvements processing a large number of Avro files in S3+Spark
> 
>
> Key: BEAM-9434
> URL: https://issues.apache.org/jira/browse/BEAM-9434
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-aws, sdk-java-core
>Affects Versions: 2.19.0
>Reporter: Emiliano Capoccia
>Assignee: Emiliano Capoccia
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> There is a performance issue when processing a large number of small Avro 
> files in Spark on K8S (tens of thousands or more).
> The recommended way of reading a pattern of Avro files in Beam is by means of:
>  
> {code:java}
> PCollection records = p.apply(AvroIO.read(AvroGenClass.class)
> .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles())
> {code}
> However, in the case of many small files, the above results in the entire 
> reading taking place in a single task/node, which is considerably slow and 
> has scalability issues.
> The option of omitting the hint is not viable, as it results in too many 
> tasks being spawn, and the cluster being busy doing coordination of tiny 
> tasks with high overhead.
> There are a few workarounds on the internet which mainly revolve around 
> compacting the input files before processing, so that a reduced number of 
> bulky files is processed in parallel.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9434) Performance improvements processing a large number of Avro files in S3+Spark

2020-03-05 Thread Emiliano Capoccia (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emiliano Capoccia updated BEAM-9434:

Description: 
There is a performance issue when processing a large number of small Avro files 
in Spark on K8S (tens of thousands or more).

The recommended way of reading a pattern of Avro files in Beam is by means of:

 
{code:java}
PCollection records = p.apply(AvroIO.read(AvroGenClass.class)
.from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles())
{code}
However, in the case of many small files, the above results in the entire 
reading taking place in a single task/node, which is considerably slow and has 
scalability issues.

The option of omitting the hint is not viable, as it results in too many tasks 
being spawn, and the cluster being busy doing coordination of tiny tasks with 
high overhead.

There are a few workarounds on the internet which mainly revolve around 
compacting the input files before processing, so that a reduced number of bulky 
files is processed in parallel.

 

  was:
There is a performance issue when processing in Spark on K8S a large number of 
small Avro files (tens of thousands or more).

The recommended way of reading a pattern of Avro files in Beam is by means of:

 
{code:java}
PCollection records = p.apply(AvroIO.read(AvroGenClass.class)
.from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles())
{code}
However, in the case of many small files the above results in the entire 
reading taking place in a single task/node, which is considerably slow and has 
scalability issues.

The option of omitting the hint is not viable, as it results in too many tasks 
being spawn and the cluster busy doing coordination of tiny tasks with high 
overhead.

There are a few workarounds on the internet which mainly revolve around 
compacting the input files before processing, so that a reduced number of bulky 
files is processed in parallel.

 


> Performance improvements processing a large number of Avro files in S3+Spark
> 
>
> Key: BEAM-9434
> URL: https://issues.apache.org/jira/browse/BEAM-9434
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-aws, sdk-java-core
>Affects Versions: 2.19.0
>Reporter: Emiliano Capoccia
>Assignee: Emiliano Capoccia
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> There is a performance issue when processing a large number of small Avro 
> files in Spark on K8S (tens of thousands or more).
> The recommended way of reading a pattern of Avro files in Beam is by means of:
>  
> {code:java}
> PCollection records = p.apply(AvroIO.read(AvroGenClass.class)
> .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles())
> {code}
> However, in the case of many small files, the above results in the entire 
> reading taking place in a single task/node, which is considerably slow and 
> has scalability issues.
> The option of omitting the hint is not viable, as it results in too many 
> tasks being spawn, and the cluster being busy doing coordination of tiny 
> tasks with high overhead.
> There are a few workarounds on the internet which mainly revolve around 
> compacting the input files before processing, so that a reduced number of 
> bulky files is processed in parallel.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9434) Performance improvements processing a large number of Avro files in S3+Spark

2020-03-05 Thread Emiliano Capoccia (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emiliano Capoccia updated BEAM-9434:

Summary: Performance improvements processing a large number of Avro files 
in S3+Spark  (was: Performance improvements processiong a large number of Avro 
files in S3+Spark)

> Performance improvements processing a large number of Avro files in S3+Spark
> 
>
> Key: BEAM-9434
> URL: https://issues.apache.org/jira/browse/BEAM-9434
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-aws, sdk-java-core
>Affects Versions: 2.19.0
>Reporter: Emiliano Capoccia
>Assignee: Emiliano Capoccia
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> There is a performance issue when processing in Spark on K8S a large number 
> of small Avro files (tens of thousands or more).
> The recommended way of reading a pattern of Avro files in Beam is by means of:
>  
> {code:java}
> PCollection records = p.apply(AvroIO.read(AvroGenClass.class)
> .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles())
> {code}
> However, in the case of many small files the above results in the entire 
> reading taking place in a single task/node, which is considerably slow and 
> has scalability issues.
> The option of omitting the hint is not viable, as it results in too many 
> tasks being spawn and the cluster busy doing coordination of tiny tasks with 
> high overhead.
> There are a few workarounds on the internet which mainly revolve around 
> compacting the input files before processing, so that a reduced number of 
> bulky files is processed in parallel.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9434) Performance improvements processiong a large number of Avro files in S3+Spark

2020-03-03 Thread Emiliano Capoccia (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17050688#comment-17050688
 ] 

Emiliano Capoccia commented on BEAM-9434:
-

In the case outlined of a large number of very small (kb) avro files, the idea 
is to expose a new hint in the AvroIO class that can handle the reading of the 
input files with a pre determined number of parallel tasks.

Both extremes of having a very high or a very low number of tasks should be 
avoided, as they are suboptimal in terms of performance: too many tasks yield 
to very high overhead whereas a too few tasks (or a single one) result in an 
unacceptable serialisation of reading on too little node, with the cluster 
being under utilised. 

In the tests that I carried out, I was reading 6578 Avro files from S3, each 
containing a single record.

The performance of the reading using the proposed pull request #11037 improved 
using 10 partitions, from 16 minutes to 2.3 minutes for performing the same 
exact work.

Even more importantly, the memory used by every node is 1/10th roughly of the 
case with a single node.

*Reference run*, 6578 files, 1 task/executor, shuffle read 164kb, 6578 records, 
shuffle write 58Mb, 16 minutes execution time.

*PR #11037*, 10 tasks/executors, 660 files per task average, totalling 6578; 
23kb average shuffle read per task, 6 Mb average shuffle write per task, 2.3 
minutes execution time per executor in parallel.

> Performance improvements processiong a large number of Avro files in S3+Spark
> -
>
> Key: BEAM-9434
> URL: https://issues.apache.org/jira/browse/BEAM-9434
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-aws, sdk-java-core
>Affects Versions: 2.19.0
>Reporter: Emiliano Capoccia
>Assignee: Emiliano Capoccia
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> There is a performance issue when processing in Spark on K8S a large number 
> of small Avro files (tens of thousands or more).
> The recommended way of reading a pattern of Avro files in Beam is by means of:
>  
> {code:java}
> PCollection records = p.apply(AvroIO.read(AvroGenClass.class)
> .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles())
> {code}
> However, in the case of many small files the above results in the entire 
> reading taking place in a single task/node, which is considerably slow and 
> has scalability issues.
> The option of omitting the hint is not viable, as it results in too many 
> tasks being spawn and the cluster busy doing coordination of tiny tasks with 
> high overhead.
> There are a few workarounds on the internet which mainly revolve around 
> compacting the input files before processing, so that a reduced number of 
> bulky files is processed in parallel.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9434) Performance improvements processiong a large number of Avro files in S3+Spark

2020-03-03 Thread Emiliano Capoccia (Jira)
Emiliano Capoccia created BEAM-9434:
---

 Summary: Performance improvements processiong a large number of 
Avro files in S3+Spark
 Key: BEAM-9434
 URL: https://issues.apache.org/jira/browse/BEAM-9434
 Project: Beam
  Issue Type: Improvement
  Components: io-java-aws, sdk-java-core
Affects Versions: 2.19.0
Reporter: Emiliano Capoccia
Assignee: Emiliano Capoccia


There is a performance issue when processing in Spark on K8S a large number of 
small Avro files (tens of thousands or more).

The recommended way of reading a pattern of Avro files in Beam is by means of:

 
{code:java}
PCollection records = p.apply(AvroIO.read(AvroGenClass.class)
.from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles())
{code}
However, in the case of many small files the above results in the entire 
reading taking place in a single task/node, which is considerably slow and has 
scalability issues.

The option of omitting the hint is not viable, as it results in too many tasks 
being spawn and the cluster busy doing coordination of tiny tasks with high 
overhead.

There are a few workarounds on the internet which mainly revolve around 
compacting the input files before processing, so that a reduced number of bulky 
files is processed in parallel.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)