[jira] [Comment Edited] (BEAM-9434) Performance improvements processing a large number of Avro files in S3+Spark
[ https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17053885#comment-17053885 ] Emiliano Capoccia edited comment on BEAM-9434 at 3/7/20, 9:30 AM: -- Both. Reading the metadata wouldn't be a problem, it also happens on every node in the proposed PR. But the actual reading also happens on one node with unacceptably high reading times. Maybe what you say applies possibly to the case of "bulky" files. However, my solution particularly applies to the case where there is a high number of tiny files (I think I explained better in the Jira ticket). In this latter case, the latency of reading each file from S3 dominates, but no chunking / shuffling happens with the standard Beam. When I look at the DAG in Spark, I can see only one task there, and if I look at the executors they are all idle spare the one when all the reading happens. This is true for both the stage where you read the metadata, and the stage where you read the data. With the proposed PR instead the number of tasks and parallel executors in the DAG is the one that you pass in the hint. was (Author: ecapoccia): Both. Reading the metadata wouldn't be a problem, it also happens on every node in the proposed PR. But the actual reading also happens on one node with unacceptably high reading times. What you say applies to the case of "bulky" files. For those, the shuffling stage chunks the files and shuffle reading of each chunk. However, my solution particularly applies to the case where there is a high number of tiny files (I think I explained better in the Jira ticket). In this latter case, the latency of reading each file from S3 dominates, but no chunking / shuffling happens with the standard Beam. > Performance improvements processing a large number of Avro files in S3+Spark > > > Key: BEAM-9434 > URL: https://issues.apache.org/jira/browse/BEAM-9434 > Project: Beam > Issue Type: Improvement > Components: io-java-aws, sdk-java-core >Affects Versions: 2.19.0 >Reporter: Emiliano Capoccia >Assignee: Emiliano Capoccia >Priority: Minor > Time Spent: 1h 20m > Remaining Estimate: 0h > > There is a performance issue when processing a large number of small Avro > files in Spark on K8S (tens of thousands or more). > The recommended way of reading a pattern of Avro files in Beam is by means of: > > {code:java} > PCollection records = p.apply(AvroIO.read(AvroGenClass.class) > .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles()) > {code} > However, in the case of many small files, the above results in the entire > reading taking place in a single task/node, which is considerably slow and > has scalability issues. > The option of omitting the hint is not viable, as it results in too many > tasks being spawn, and the cluster being busy doing coordination of tiny > tasks with high overhead. > There are a few workarounds on the internet which mainly revolve around > compacting the input files before processing, so that a reduced number of > bulky files is processed in parallel. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9434) Performance improvements processing a large number of Avro files in S3+Spark
[ https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17053885#comment-17053885 ] Emiliano Capoccia commented on BEAM-9434: - Both. Reading the metadata wouldn't be a problem, it also happens on every node in the proposed PR. But the actual reading also happens on one node with unacceptably high reading times. What you say applies to the case of "bulky" files. For those, the shuffling stage chunks the files and shuffle reading of each chunk. However, my solution particularly applies to the case where there is a high number of tiny files (I think I explained better in the Jira ticket). In this latter case, the latency of reading each file from S3 dominates, but no chunking / shuffling happens with the standard Beam. > Performance improvements processing a large number of Avro files in S3+Spark > > > Key: BEAM-9434 > URL: https://issues.apache.org/jira/browse/BEAM-9434 > Project: Beam > Issue Type: Improvement > Components: io-java-aws, sdk-java-core >Affects Versions: 2.19.0 >Reporter: Emiliano Capoccia >Assignee: Emiliano Capoccia >Priority: Minor > Time Spent: 1h 20m > Remaining Estimate: 0h > > There is a performance issue when processing a large number of small Avro > files in Spark on K8S (tens of thousands or more). > The recommended way of reading a pattern of Avro files in Beam is by means of: > > {code:java} > PCollection records = p.apply(AvroIO.read(AvroGenClass.class) > .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles()) > {code} > However, in the case of many small files, the above results in the entire > reading taking place in a single task/node, which is considerably slow and > has scalability issues. > The option of omitting the hint is not viable, as it results in too many > tasks being spawn, and the cluster being busy doing coordination of tiny > tasks with high overhead. > There are a few workarounds on the internet which mainly revolve around > compacting the input files before processing, so that a reduced number of > bulky files is processed in parallel. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (BEAM-9434) Performance improvements processing a large number of Avro files in S3+Spark
[ https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17050688#comment-17050688 ] Emiliano Capoccia edited comment on BEAM-9434 at 3/5/20, 9:41 PM: -- In the case outlined of a large number of very small (kb) avro files, the idea is to expose a new hint in the AvroIO class that can handle the reading of the input files with a pre determined number of parallel tasks. Both extremes of having a very high or a very low number of tasks should be avoided, as they are suboptimal in terms of performance: too many tasks yield to very high overhead whereas too few (or a single one) result in an unacceptable serialisation on few nodes, with the cluster being under utilised. In my tests I read 6578 Avro files from S3, each containing a single record. The performance of the reading the files using the proposed pull request #11037 improved from 16 minutes to 2.3 minutes with 10 partitions. Even more importantly, the memory used by every node is 1/10th roughly of the case with a single node. *Reference run*, 6578 files, 1 task/executor, shuffle read 164kb, 6578 records, shuffle write 58Mb, 16 minutes execution time. *PR #11037*, 10 tasks/executors, 660 files per task average, totalling 6578; 23kb average shuffle read per task, 6 Mb average shuffle write per task, 2.3 minutes execution time per executor in parallel. was (Author: ecapoccia): In the case outlined of a large number of very small (kb) avro files, the idea is to expose a new hint in the AvroIO class that can handle the reading of the input files with a pre determined number of parallel tasks. Both extremes of having a very high or a very low number of tasks should be avoided, as they are suboptimal in terms of performance: too many tasks yield to very high overhead whereas a too few tasks (or a single one) result in an unacceptable serialisation of reading on too little node, with the cluster being under utilised. In the tests that I carried out, I was reading 6578 Avro files from S3, each containing a single record. The performance of the reading using the proposed pull request #11037 improved using 10 partitions, from 16 minutes to 2.3 minutes for performing the same exact work. Even more importantly, the memory used by every node is 1/10th roughly of the case with a single node. *Reference run*, 6578 files, 1 task/executor, shuffle read 164kb, 6578 records, shuffle write 58Mb, 16 minutes execution time. *PR #11037*, 10 tasks/executors, 660 files per task average, totalling 6578; 23kb average shuffle read per task, 6 Mb average shuffle write per task, 2.3 minutes execution time per executor in parallel. > Performance improvements processing a large number of Avro files in S3+Spark > > > Key: BEAM-9434 > URL: https://issues.apache.org/jira/browse/BEAM-9434 > Project: Beam > Issue Type: Improvement > Components: io-java-aws, sdk-java-core >Affects Versions: 2.19.0 >Reporter: Emiliano Capoccia >Assignee: Emiliano Capoccia >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > There is a performance issue when processing a large number of small Avro > files in Spark on K8S (tens of thousands or more). > The recommended way of reading a pattern of Avro files in Beam is by means of: > > {code:java} > PCollection records = p.apply(AvroIO.read(AvroGenClass.class) > .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles()) > {code} > However, in the case of many small files, the above results in the entire > reading taking place in a single task/node, which is considerably slow and > has scalability issues. > The option of omitting the hint is not viable, as it results in too many > tasks being spawn, and the cluster being busy doing coordination of tiny > tasks with high overhead. > There are a few workarounds on the internet which mainly revolve around > compacting the input files before processing, so that a reduced number of > bulky files is processed in parallel. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9434) Performance improvements processing a large number of Avro files in S3+Spark
[ https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Emiliano Capoccia updated BEAM-9434: Description: There is a performance issue when processing a large number of small Avro files in Spark on K8S (tens of thousands or more). The recommended way of reading a pattern of Avro files in Beam is by means of: {code:java} PCollection records = p.apply(AvroIO.read(AvroGenClass.class) .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles()) {code} However, in the case of many small files, the above results in the entire reading taking place in a single task/node, which is considerably slow and has scalability issues. The option of omitting the hint is not viable, as it results in too many tasks being spawn, and the cluster being busy doing coordination of tiny tasks with high overhead. There are a few workarounds on the internet which mainly revolve around compacting the input files before processing, so that a reduced number of bulky files is processed in parallel. was: There is a performance issue when processing in Spark on K8S a large number of small Avro files (tens of thousands or more). The recommended way of reading a pattern of Avro files in Beam is by means of: {code:java} PCollection records = p.apply(AvroIO.read(AvroGenClass.class) .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles()) {code} However, in the case of many small files the above results in the entire reading taking place in a single task/node, which is considerably slow and has scalability issues. The option of omitting the hint is not viable, as it results in too many tasks being spawn and the cluster busy doing coordination of tiny tasks with high overhead. There are a few workarounds on the internet which mainly revolve around compacting the input files before processing, so that a reduced number of bulky files is processed in parallel. > Performance improvements processing a large number of Avro files in S3+Spark > > > Key: BEAM-9434 > URL: https://issues.apache.org/jira/browse/BEAM-9434 > Project: Beam > Issue Type: Improvement > Components: io-java-aws, sdk-java-core >Affects Versions: 2.19.0 >Reporter: Emiliano Capoccia >Assignee: Emiliano Capoccia >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > There is a performance issue when processing a large number of small Avro > files in Spark on K8S (tens of thousands or more). > The recommended way of reading a pattern of Avro files in Beam is by means of: > > {code:java} > PCollection records = p.apply(AvroIO.read(AvroGenClass.class) > .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles()) > {code} > However, in the case of many small files, the above results in the entire > reading taking place in a single task/node, which is considerably slow and > has scalability issues. > The option of omitting the hint is not viable, as it results in too many > tasks being spawn, and the cluster being busy doing coordination of tiny > tasks with high overhead. > There are a few workarounds on the internet which mainly revolve around > compacting the input files before processing, so that a reduced number of > bulky files is processed in parallel. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9434) Performance improvements processing a large number of Avro files in S3+Spark
[ https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Emiliano Capoccia updated BEAM-9434: Summary: Performance improvements processing a large number of Avro files in S3+Spark (was: Performance improvements processiong a large number of Avro files in S3+Spark) > Performance improvements processing a large number of Avro files in S3+Spark > > > Key: BEAM-9434 > URL: https://issues.apache.org/jira/browse/BEAM-9434 > Project: Beam > Issue Type: Improvement > Components: io-java-aws, sdk-java-core >Affects Versions: 2.19.0 >Reporter: Emiliano Capoccia >Assignee: Emiliano Capoccia >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > There is a performance issue when processing in Spark on K8S a large number > of small Avro files (tens of thousands or more). > The recommended way of reading a pattern of Avro files in Beam is by means of: > > {code:java} > PCollection records = p.apply(AvroIO.read(AvroGenClass.class) > .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles()) > {code} > However, in the case of many small files the above results in the entire > reading taking place in a single task/node, which is considerably slow and > has scalability issues. > The option of omitting the hint is not viable, as it results in too many > tasks being spawn and the cluster busy doing coordination of tiny tasks with > high overhead. > There are a few workarounds on the internet which mainly revolve around > compacting the input files before processing, so that a reduced number of > bulky files is processed in parallel. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9434) Performance improvements processiong a large number of Avro files in S3+Spark
[ https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17050688#comment-17050688 ] Emiliano Capoccia commented on BEAM-9434: - In the case outlined of a large number of very small (kb) avro files, the idea is to expose a new hint in the AvroIO class that can handle the reading of the input files with a pre determined number of parallel tasks. Both extremes of having a very high or a very low number of tasks should be avoided, as they are suboptimal in terms of performance: too many tasks yield to very high overhead whereas a too few tasks (or a single one) result in an unacceptable serialisation of reading on too little node, with the cluster being under utilised. In the tests that I carried out, I was reading 6578 Avro files from S3, each containing a single record. The performance of the reading using the proposed pull request #11037 improved using 10 partitions, from 16 minutes to 2.3 minutes for performing the same exact work. Even more importantly, the memory used by every node is 1/10th roughly of the case with a single node. *Reference run*, 6578 files, 1 task/executor, shuffle read 164kb, 6578 records, shuffle write 58Mb, 16 minutes execution time. *PR #11037*, 10 tasks/executors, 660 files per task average, totalling 6578; 23kb average shuffle read per task, 6 Mb average shuffle write per task, 2.3 minutes execution time per executor in parallel. > Performance improvements processiong a large number of Avro files in S3+Spark > - > > Key: BEAM-9434 > URL: https://issues.apache.org/jira/browse/BEAM-9434 > Project: Beam > Issue Type: Improvement > Components: io-java-aws, sdk-java-core >Affects Versions: 2.19.0 >Reporter: Emiliano Capoccia >Assignee: Emiliano Capoccia >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > There is a performance issue when processing in Spark on K8S a large number > of small Avro files (tens of thousands or more). > The recommended way of reading a pattern of Avro files in Beam is by means of: > > {code:java} > PCollection records = p.apply(AvroIO.read(AvroGenClass.class) > .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles()) > {code} > However, in the case of many small files the above results in the entire > reading taking place in a single task/node, which is considerably slow and > has scalability issues. > The option of omitting the hint is not viable, as it results in too many > tasks being spawn and the cluster busy doing coordination of tiny tasks with > high overhead. > There are a few workarounds on the internet which mainly revolve around > compacting the input files before processing, so that a reduced number of > bulky files is processed in parallel. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9434) Performance improvements processiong a large number of Avro files in S3+Spark
Emiliano Capoccia created BEAM-9434: --- Summary: Performance improvements processiong a large number of Avro files in S3+Spark Key: BEAM-9434 URL: https://issues.apache.org/jira/browse/BEAM-9434 Project: Beam Issue Type: Improvement Components: io-java-aws, sdk-java-core Affects Versions: 2.19.0 Reporter: Emiliano Capoccia Assignee: Emiliano Capoccia There is a performance issue when processing in Spark on K8S a large number of small Avro files (tens of thousands or more). The recommended way of reading a pattern of Avro files in Beam is by means of: {code:java} PCollection records = p.apply(AvroIO.read(AvroGenClass.class) .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles()) {code} However, in the case of many small files the above results in the entire reading taking place in a single task/node, which is considerably slow and has scalability issues. The option of omitting the hint is not viable, as it results in too many tasks being spawn and the cluster busy doing coordination of tiny tasks with high overhead. There are a few workarounds on the internet which mainly revolve around compacting the input files before processing, so that a reduced number of bulky files is processed in parallel. -- This message was sent by Atlassian Jira (v8.3.4#803005)