Re: [DISCUSS] Performance of write() in file based IO
Tim, thanks for digging into this! There are some complexities fixing the bug (i.e. Beam currently allows the temp directory to be different than the target directory), but let's continue discussion on that JIRA. Reuven On Thu, Aug 23, 2018 at 6:05 AM Tim Robertson wrote: > Thanks for linking this discussion with BEAM-5036 (and transitively to > BEAM-4861 which also comes in to play) Jozek. > > What Reuven speculated and Jozek had previously observed is indeed the > major cause. Today I've been testing the effect of a "move" using rename() > instead of a copy() and delete(). > > My test environment is different today but still using 1.5TB input data > and the code I linked earlier in GH [1]: > > - Spark API: 35 minutes > - Beam AvroIO (2.6.0): 1.7hrs > - Beam AvroIO with rename() patch: 42 minutes > > On the DAG linked in the GH repo [1] stages 3&4 are reduced to seconds > saving 53 minutes from Beam 2.6.0 version which is the predominant gain > here. > > Unless new comments come in I propose fixing BEAM-5036 and BEAM-4861 and > continuing discussion on those Jiras. > This requires a bit of exploration and decision around the expectations of > e.g. the target directory not existing and also correcting the incorrect > use of the HDFS API (it ignores the return value which can indicate error > on e.g. directory not existing today). > > Thank you all for contributing to this discussion. > > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro > > > > On Thu, Aug 23, 2018 at 11:55 AM Jozef Vilcek > wrote: > >> Just for reference, there is a JIRA open for >> FileBasedSink.moveToOutputFiles() and filesystem move behavior >> >> https://issues.apache.org/jira/browse/BEAM-5036 >> >> >> On Wed, Aug 22, 2018 at 9:15 PM Tim Robertson >> wrote: >> >>> Reuven, I think you might be on to something >>> >>> The Beam HadoopFileSystem copy() does indeed stream through the driver >>> [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method >>> [2]. >>> I'll cobble together a patched version to test using a rename() rather >>> than a copy() and report back findings before we consider the implications. >>> >>> Thanks >>> >>> [1] >>> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124 >>> [2] >>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288 >>> >>> On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson >>> wrote: >>> > Does HDFS support a fast rename operation? Yes. From the shell it is “mv” and in the Java API it is “rename(Path src, Path dst)”. I am not aware of a fast copy though. I think an HDFS copy streams the bytes through the driver (unless a distcp is issued which is a MR job). (Thanks for engaging in this discussion folks) On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax wrote: > I have another theory: in FileBasedSink.moveToOutputFiles we copy the > temporary files to the final destination and then delete the temp files. > Does HDFS support a fast rename operation? If so, I bet Spark is using > that > instead of paying the cost of copying the files. > > On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax wrote: > >> Ismael, that should already be true. If not using dynamic >> destinations there might be some edges in the graph that are never used >> (i.e. no records are ever published on them), but that should not affect >> performance. If this is not the case we should fix it. >> >> Reuven >> >> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía >> wrote: >> >>> Spark runner uses the Spark broadcast mechanism to materialize the >>> side input PCollections in the workers, not sure exactly if this is >>> efficient assigned in an optimal way but seems logical at least. >>> >>> Just wondering if we shouldn't better first tackle the fact that if >>> the pipeline does not have dynamic destinations (this case) >>> WriteFiles >>> should not be doing so much extra magic? >>> >>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax wrote: >>> > >>> > Often only the metadata (i.e. temp file names) are shuffled, >>> except in the "spilling" case (which should only happen when using >>> dynamic >>> destinations). >>> > >>> > WriteFiles depends heavily on side inputs. How are side inputs >>> implemented in the Spark runner? >>> > >>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw < >>> rober...@google.com> wrote: >>> >> >>> >> Yes, I stand corrected, dynamic writes is now much more than the >>> >> primitive window-based naming we used to have. >>> >> >>> >> It would be interesting to visualize how much of this codepath is >>> >> metatada vs. the actual data. >>> >> >>> >> In the case of file writing, it seems one cou
Re: [DISCUSS] Performance of write() in file based IO
Thanks for linking this discussion with BEAM-5036 (and transitively to BEAM-4861 which also comes in to play) Jozek. What Reuven speculated and Jozek had previously observed is indeed the major cause. Today I've been testing the effect of a "move" using rename() instead of a copy() and delete(). My test environment is different today but still using 1.5TB input data and the code I linked earlier in GH [1]: - Spark API: 35 minutes - Beam AvroIO (2.6.0): 1.7hrs - Beam AvroIO with rename() patch: 42 minutes On the DAG linked in the GH repo [1] stages 3&4 are reduced to seconds saving 53 minutes from Beam 2.6.0 version which is the predominant gain here. Unless new comments come in I propose fixing BEAM-5036 and BEAM-4861 and continuing discussion on those Jiras. This requires a bit of exploration and decision around the expectations of e.g. the target directory not existing and also correcting the incorrect use of the HDFS API (it ignores the return value which can indicate error on e.g. directory not existing today). Thank you all for contributing to this discussion. [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro On Thu, Aug 23, 2018 at 11:55 AM Jozef Vilcek wrote: > Just for reference, there is a JIRA open for > FileBasedSink.moveToOutputFiles() and filesystem move behavior > > https://issues.apache.org/jira/browse/BEAM-5036 > > > On Wed, Aug 22, 2018 at 9:15 PM Tim Robertson > wrote: > >> Reuven, I think you might be on to something >> >> The Beam HadoopFileSystem copy() does indeed stream through the driver >> [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method >> [2]. >> I'll cobble together a patched version to test using a rename() rather >> than a copy() and report back findings before we consider the implications. >> >> Thanks >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124 >> [2] >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288 >> >> On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson >> wrote: >> >>> > Does HDFS support a fast rename operation? >>> >>> Yes. From the shell it is “mv” and in the Java API it is “rename(Path >>> src, Path dst)”. >>> I am not aware of a fast copy though. I think an HDFS copy streams the >>> bytes through the driver (unless a distcp is issued which is a MR job). >>> >>> (Thanks for engaging in this discussion folks) >>> >>> >>> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax wrote: >>> I have another theory: in FileBasedSink.moveToOutputFiles we copy the temporary files to the final destination and then delete the temp files. Does HDFS support a fast rename operation? If so, I bet Spark is using that instead of paying the cost of copying the files. On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax wrote: > Ismael, that should already be true. If not using dynamic destinations > there might be some edges in the graph that are never used (i.e. no > records > are ever published on them), but that should not affect performance. If > this is not the case we should fix it. > > Reuven > > On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía > wrote: > >> Spark runner uses the Spark broadcast mechanism to materialize the >> side input PCollections in the workers, not sure exactly if this is >> efficient assigned in an optimal way but seems logical at least. >> >> Just wondering if we shouldn't better first tackle the fact that if >> the pipeline does not have dynamic destinations (this case) WriteFiles >> should not be doing so much extra magic? >> >> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax wrote: >> > >> > Often only the metadata (i.e. temp file names) are shuffled, except >> in the "spilling" case (which should only happen when using dynamic >> destinations). >> > >> > WriteFiles depends heavily on side inputs. How are side inputs >> implemented in the Spark runner? >> > >> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw < >> rober...@google.com> wrote: >> >> >> >> Yes, I stand corrected, dynamic writes is now much more than the >> >> primitive window-based naming we used to have. >> >> >> >> It would be interesting to visualize how much of this codepath is >> >> metatada vs. the actual data. >> >> >> >> In the case of file writing, it seems one could (maybe?) avoid >> >> requiring a stable input, as shards are accepted as a whole >> (unlike, >> >> say, sinks where a deterministic uid is needed for deduplication on >> >> retry). >> >> >> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax >> wrote: >> >> > >> >> > Robert - much of the complexity isn't due to streaming, but >> rather because WriteFiles supports "dynamic" output (where the us
Re: [DISCUSS] Performance of write() in file based IO
Just for reference, there is a JIRA open for FileBasedSink.moveToOutputFiles() and filesystem move behavior https://issues.apache.org/jira/browse/BEAM-5036 On Wed, Aug 22, 2018 at 9:15 PM Tim Robertson wrote: > Reuven, I think you might be on to something > > The Beam HadoopFileSystem copy() does indeed stream through the driver > [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method > [2]. > I'll cobble together a patched version to test using a rename() rather > than a copy() and report back findings before we consider the implications. > > Thanks > > [1] > https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124 > [2] > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288 > > On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson > wrote: > >> > Does HDFS support a fast rename operation? >> >> Yes. From the shell it is “mv” and in the Java API it is “rename(Path >> src, Path dst)”. >> I am not aware of a fast copy though. I think an HDFS copy streams the >> bytes through the driver (unless a distcp is issued which is a MR job). >> >> (Thanks for engaging in this discussion folks) >> >> >> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax wrote: >> >>> I have another theory: in FileBasedSink.moveToOutputFiles we copy the >>> temporary files to the final destination and then delete the temp files. >>> Does HDFS support a fast rename operation? If so, I bet Spark is using that >>> instead of paying the cost of copying the files. >>> >>> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax wrote: >>> Ismael, that should already be true. If not using dynamic destinations there might be some edges in the graph that are never used (i.e. no records are ever published on them), but that should not affect performance. If this is not the case we should fix it. Reuven On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía wrote: > Spark runner uses the Spark broadcast mechanism to materialize the > side input PCollections in the workers, not sure exactly if this is > efficient assigned in an optimal way but seems logical at least. > > Just wondering if we shouldn't better first tackle the fact that if > the pipeline does not have dynamic destinations (this case) WriteFiles > should not be doing so much extra magic? > > On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax wrote: > > > > Often only the metadata (i.e. temp file names) are shuffled, except > in the "spilling" case (which should only happen when using dynamic > destinations). > > > > WriteFiles depends heavily on side inputs. How are side inputs > implemented in the Spark runner? > > > > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw > wrote: > >> > >> Yes, I stand corrected, dynamic writes is now much more than the > >> primitive window-based naming we used to have. > >> > >> It would be interesting to visualize how much of this codepath is > >> metatada vs. the actual data. > >> > >> In the case of file writing, it seems one could (maybe?) avoid > >> requiring a stable input, as shards are accepted as a whole (unlike, > >> say, sinks where a deterministic uid is needed for deduplication on > >> retry). > >> > >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax > wrote: > >> > > >> > Robert - much of the complexity isn't due to streaming, but > rather because WriteFiles supports "dynamic" output (where the user can > choose a destination file based on the input record). In practice if a > pipeline is not using dynamic destinations the full graph is still > generated, but much of that graph is never used (empty PCollections). > >> > > >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw < > rober...@google.com> wrote: > >> >> > >> >> I agree that this is concerning. Some of the complexity may have > also > >> >> been introduced to accommodate writing files in Streaming mode, > but it > >> >> seems we should be able to execute this as a single Map > operation. > >> >> > >> >> Have you profiled to see which stages and/or operations are > taking up the time? > >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson > >> >> wrote: > >> >> > > >> >> > Hi folks, > >> >> > > >> >> > I've recently been involved in projects rewriting Avro files > and have discovered a concerning performance trait in Beam. > >> >> > > >> >> > I have observed Beam between 6-20x slower than native Spark or > MapReduce code for a simple pipeline of read Avro, modify, write Avro. > >> >> > > >> >> > - Rewriting 200TB of Avro files (big cluster): 14 hrs using > Beam/Spark, 40 minutes with a map-only MR job > >> >> > - Rewriting 1.5TB Avro file (small
Re: [DISCUSS] Performance of write() in file based IO
Reuven, I think you might be on to something The Beam HadoopFileSystem copy() does indeed stream through the driver [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method [2]. I'll cobble together a patched version to test using a rename() rather than a copy() and report back findings before we consider the implications. Thanks [1] https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124 [2] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288 On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson wrote: > > Does HDFS support a fast rename operation? > > Yes. From the shell it is “mv” and in the Java API it is “rename(Path src, > Path dst)”. > I am not aware of a fast copy though. I think an HDFS copy streams the > bytes through the driver (unless a distcp is issued which is a MR job). > > (Thanks for engaging in this discussion folks) > > > On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax wrote: > >> I have another theory: in FileBasedSink.moveToOutputFiles we copy the >> temporary files to the final destination and then delete the temp files. >> Does HDFS support a fast rename operation? If so, I bet Spark is using that >> instead of paying the cost of copying the files. >> >> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax wrote: >> >>> Ismael, that should already be true. If not using dynamic destinations >>> there might be some edges in the graph that are never used (i.e. no records >>> are ever published on them), but that should not affect performance. If >>> this is not the case we should fix it. >>> >>> Reuven >>> >>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía wrote: >>> Spark runner uses the Spark broadcast mechanism to materialize the side input PCollections in the workers, not sure exactly if this is efficient assigned in an optimal way but seems logical at least. Just wondering if we shouldn't better first tackle the fact that if the pipeline does not have dynamic destinations (this case) WriteFiles should not be doing so much extra magic? On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax wrote: > > Often only the metadata (i.e. temp file names) are shuffled, except in the "spilling" case (which should only happen when using dynamic destinations). > > WriteFiles depends heavily on side inputs. How are side inputs implemented in the Spark runner? > > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw wrote: >> >> Yes, I stand corrected, dynamic writes is now much more than the >> primitive window-based naming we used to have. >> >> It would be interesting to visualize how much of this codepath is >> metatada vs. the actual data. >> >> In the case of file writing, it seems one could (maybe?) avoid >> requiring a stable input, as shards are accepted as a whole (unlike, >> say, sinks where a deterministic uid is needed for deduplication on >> retry). >> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax wrote: >> > >> > Robert - much of the complexity isn't due to streaming, but rather because WriteFiles supports "dynamic" output (where the user can choose a destination file based on the input record). In practice if a pipeline is not using dynamic destinations the full graph is still generated, but much of that graph is never used (empty PCollections). >> > >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw < rober...@google.com> wrote: >> >> >> >> I agree that this is concerning. Some of the complexity may have also >> >> been introduced to accommodate writing files in Streaming mode, but it >> >> seems we should be able to execute this as a single Map operation. >> >> >> >> Have you profiled to see which stages and/or operations are taking up the time? >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson >> >> wrote: >> >> > >> >> > Hi folks, >> >> > >> >> > I've recently been involved in projects rewriting Avro files and have discovered a concerning performance trait in Beam. >> >> > >> >> > I have observed Beam between 6-20x slower than native Spark or MapReduce code for a simple pipeline of read Avro, modify, write Avro. >> >> > >> >> > - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 40 minutes with a map-only MR job >> >> > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1] >> >> > >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / YARN) on reference Dell / Cloudera hardware. >> >> > >> >> > I have only just started exploring but I believe the cause is rooted i
Re: [DISCUSS] Performance of write() in file based IO
> Does HDFS support a fast rename operation? Yes. From the shell it is “mv” and in the Java API it is “rename(Path src, Path dst)”. I am not aware of a fast copy though. I think an HDFS copy streams the bytes through the driver (unless a distcp is issued which is a MR job). (Thanks for engaging in this discussion folks) On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax wrote: > I have another theory: in FileBasedSink.moveToOutputFiles we copy the > temporary files to the final destination and then delete the temp files. > Does HDFS support a fast rename operation? If so, I bet Spark is using that > instead of paying the cost of copying the files. > > On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax wrote: > >> Ismael, that should already be true. If not using dynamic destinations >> there might be some edges in the graph that are never used (i.e. no records >> are ever published on them), but that should not affect performance. If >> this is not the case we should fix it. >> >> Reuven >> >> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía wrote: >> >>> Spark runner uses the Spark broadcast mechanism to materialize the >>> side input PCollections in the workers, not sure exactly if this is >>> efficient assigned in an optimal way but seems logical at least. >>> >>> Just wondering if we shouldn't better first tackle the fact that if >>> the pipeline does not have dynamic destinations (this case) WriteFiles >>> should not be doing so much extra magic? >>> >>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax wrote: >>> > >>> > Often only the metadata (i.e. temp file names) are shuffled, except in >>> the "spilling" case (which should only happen when using dynamic >>> destinations). >>> > >>> > WriteFiles depends heavily on side inputs. How are side inputs >>> implemented in the Spark runner? >>> > >>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw >>> wrote: >>> >> >>> >> Yes, I stand corrected, dynamic writes is now much more than the >>> >> primitive window-based naming we used to have. >>> >> >>> >> It would be interesting to visualize how much of this codepath is >>> >> metatada vs. the actual data. >>> >> >>> >> In the case of file writing, it seems one could (maybe?) avoid >>> >> requiring a stable input, as shards are accepted as a whole (unlike, >>> >> say, sinks where a deterministic uid is needed for deduplication on >>> >> retry). >>> >> >>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax wrote: >>> >> > >>> >> > Robert - much of the complexity isn't due to streaming, but rather >>> because WriteFiles supports "dynamic" output (where the user can choose a >>> destination file based on the input record). In practice if a pipeline is >>> not using dynamic destinations the full graph is still generated, but much >>> of that graph is never used (empty PCollections). >>> >> > >>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw < >>> rober...@google.com> wrote: >>> >> >> >>> >> >> I agree that this is concerning. Some of the complexity may have >>> also >>> >> >> been introduced to accommodate writing files in Streaming mode, >>> but it >>> >> >> seems we should be able to execute this as a single Map operation. >>> >> >> >>> >> >> Have you profiled to see which stages and/or operations are taking >>> up the time? >>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson >>> >> >> wrote: >>> >> >> > >>> >> >> > Hi folks, >>> >> >> > >>> >> >> > I've recently been involved in projects rewriting Avro files and >>> have discovered a concerning performance trait in Beam. >>> >> >> > >>> >> >> > I have observed Beam between 6-20x slower than native Spark or >>> MapReduce code for a simple pipeline of read Avro, modify, write Avro. >>> >> >> > >>> >> >> > - Rewriting 200TB of Avro files (big cluster): 14 hrs using >>> Beam/Spark, 40 minutes with a map-only MR job >>> >> >> > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using >>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1] >>> >> >> > >>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters >>> (Spark / YARN) on reference Dell / Cloudera hardware. >>> >> >> > >>> >> >> > I have only just started exploring but I believe the cause is >>> rooted in the WriteFiles which is used by all our file based IO. WriteFiles >>> is reasonably complex with reshuffles, spilling to temporary files >>> (presumably to accommodate varying bundle sizes/avoid small files), a >>> union, a GBK etc. >>> >> >> > >>> >> >> > Before I go too far with exploration I'd appreciate thoughts on >>> whether we believe this is a concern (I do), if we should explore >>> optimisations or any insight from previous work in this area. >>> >> >> > >>> >> >> > Thanks, >>> >> >> > Tim >>> >> >> > >>> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro >>> >>
Re: [DISCUSS] Performance of write() in file based IO
I have another theory: in FileBasedSink.moveToOutputFiles we copy the temporary files to the final destination and then delete the temp files. Does HDFS support a fast rename operation? If so, I bet Spark is using that instead of paying the cost of copying the files. On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax wrote: > Ismael, that should already be true. If not using dynamic destinations > there might be some edges in the graph that are never used (i.e. no records > are ever published on them), but that should not affect performance. If > this is not the case we should fix it. > > Reuven > > On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía wrote: > >> Spark runner uses the Spark broadcast mechanism to materialize the >> side input PCollections in the workers, not sure exactly if this is >> efficient assigned in an optimal way but seems logical at least. >> >> Just wondering if we shouldn't better first tackle the fact that if >> the pipeline does not have dynamic destinations (this case) WriteFiles >> should not be doing so much extra magic? >> >> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax wrote: >> > >> > Often only the metadata (i.e. temp file names) are shuffled, except in >> the "spilling" case (which should only happen when using dynamic >> destinations). >> > >> > WriteFiles depends heavily on side inputs. How are side inputs >> implemented in the Spark runner? >> > >> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw >> wrote: >> >> >> >> Yes, I stand corrected, dynamic writes is now much more than the >> >> primitive window-based naming we used to have. >> >> >> >> It would be interesting to visualize how much of this codepath is >> >> metatada vs. the actual data. >> >> >> >> In the case of file writing, it seems one could (maybe?) avoid >> >> requiring a stable input, as shards are accepted as a whole (unlike, >> >> say, sinks where a deterministic uid is needed for deduplication on >> >> retry). >> >> >> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax wrote: >> >> > >> >> > Robert - much of the complexity isn't due to streaming, but rather >> because WriteFiles supports "dynamic" output (where the user can choose a >> destination file based on the input record). In practice if a pipeline is >> not using dynamic destinations the full graph is still generated, but much >> of that graph is never used (empty PCollections). >> >> > >> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw >> wrote: >> >> >> >> >> >> I agree that this is concerning. Some of the complexity may have >> also >> >> >> been introduced to accommodate writing files in Streaming mode, but >> it >> >> >> seems we should be able to execute this as a single Map operation. >> >> >> >> >> >> Have you profiled to see which stages and/or operations are taking >> up the time? >> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson >> >> >> wrote: >> >> >> > >> >> >> > Hi folks, >> >> >> > >> >> >> > I've recently been involved in projects rewriting Avro files and >> have discovered a concerning performance trait in Beam. >> >> >> > >> >> >> > I have observed Beam between 6-20x slower than native Spark or >> MapReduce code for a simple pipeline of read Avro, modify, write Avro. >> >> >> > >> >> >> > - Rewriting 200TB of Avro files (big cluster): 14 hrs using >> Beam/Spark, 40 minutes with a map-only MR job >> >> >> > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using >> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1] >> >> >> > >> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters >> (Spark / YARN) on reference Dell / Cloudera hardware. >> >> >> > >> >> >> > I have only just started exploring but I believe the cause is >> rooted in the WriteFiles which is used by all our file based IO. WriteFiles >> is reasonably complex with reshuffles, spilling to temporary files >> (presumably to accommodate varying bundle sizes/avoid small files), a >> union, a GBK etc. >> >> >> > >> >> >> > Before I go too far with exploration I'd appreciate thoughts on >> whether we believe this is a concern (I do), if we should explore >> optimisations or any insight from previous work in this area. >> >> >> > >> >> >> > Thanks, >> >> >> > Tim >> >> >> > >> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro >> >
Re: [DISCUSS] Performance of write() in file based IO
Ismael, that should already be true. If not using dynamic destinations there might be some edges in the graph that are never used (i.e. no records are ever published on them), but that should not affect performance. If this is not the case we should fix it. Reuven On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía wrote: > Spark runner uses the Spark broadcast mechanism to materialize the > side input PCollections in the workers, not sure exactly if this is > efficient assigned in an optimal way but seems logical at least. > > Just wondering if we shouldn't better first tackle the fact that if > the pipeline does not have dynamic destinations (this case) WriteFiles > should not be doing so much extra magic? > > On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax wrote: > > > > Often only the metadata (i.e. temp file names) are shuffled, except in > the "spilling" case (which should only happen when using dynamic > destinations). > > > > WriteFiles depends heavily on side inputs. How are side inputs > implemented in the Spark runner? > > > > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw > wrote: > >> > >> Yes, I stand corrected, dynamic writes is now much more than the > >> primitive window-based naming we used to have. > >> > >> It would be interesting to visualize how much of this codepath is > >> metatada vs. the actual data. > >> > >> In the case of file writing, it seems one could (maybe?) avoid > >> requiring a stable input, as shards are accepted as a whole (unlike, > >> say, sinks where a deterministic uid is needed for deduplication on > >> retry). > >> > >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax wrote: > >> > > >> > Robert - much of the complexity isn't due to streaming, but rather > because WriteFiles supports "dynamic" output (where the user can choose a > destination file based on the input record). In practice if a pipeline is > not using dynamic destinations the full graph is still generated, but much > of that graph is never used (empty PCollections). > >> > > >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw > wrote: > >> >> > >> >> I agree that this is concerning. Some of the complexity may have also > >> >> been introduced to accommodate writing files in Streaming mode, but > it > >> >> seems we should be able to execute this as a single Map operation. > >> >> > >> >> Have you profiled to see which stages and/or operations are taking > up the time? > >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson > >> >> wrote: > >> >> > > >> >> > Hi folks, > >> >> > > >> >> > I've recently been involved in projects rewriting Avro files and > have discovered a concerning performance trait in Beam. > >> >> > > >> >> > I have observed Beam between 6-20x slower than native Spark or > MapReduce code for a simple pipeline of read Avro, modify, write Avro. > >> >> > > >> >> > - Rewriting 200TB of Avro files (big cluster): 14 hrs using > Beam/Spark, 40 minutes with a map-only MR job > >> >> > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using > Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1] > >> >> > > >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters > (Spark / YARN) on reference Dell / Cloudera hardware. > >> >> > > >> >> > I have only just started exploring but I believe the cause is > rooted in the WriteFiles which is used by all our file based IO. WriteFiles > is reasonably complex with reshuffles, spilling to temporary files > (presumably to accommodate varying bundle sizes/avoid small files), a > union, a GBK etc. > >> >> > > >> >> > Before I go too far with exploration I'd appreciate thoughts on > whether we believe this is a concern (I do), if we should explore > optimisations or any insight from previous work in this area. > >> >> > > >> >> > Thanks, > >> >> > Tim > >> >> > > >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro >
Re: [DISCUSS] Performance of write() in file based IO
Spark runner uses the Spark broadcast mechanism to materialize the side input PCollections in the workers, not sure exactly if this is efficient assigned in an optimal way but seems logical at least. Just wondering if we shouldn't better first tackle the fact that if the pipeline does not have dynamic destinations (this case) WriteFiles should not be doing so much extra magic? On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax wrote: > > Often only the metadata (i.e. temp file names) are shuffled, except in the > "spilling" case (which should only happen when using dynamic destinations). > > WriteFiles depends heavily on side inputs. How are side inputs implemented in > the Spark runner? > > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw wrote: >> >> Yes, I stand corrected, dynamic writes is now much more than the >> primitive window-based naming we used to have. >> >> It would be interesting to visualize how much of this codepath is >> metatada vs. the actual data. >> >> In the case of file writing, it seems one could (maybe?) avoid >> requiring a stable input, as shards are accepted as a whole (unlike, >> say, sinks where a deterministic uid is needed for deduplication on >> retry). >> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax wrote: >> > >> > Robert - much of the complexity isn't due to streaming, but rather because >> > WriteFiles supports "dynamic" output (where the user can choose a >> > destination file based on the input record). In practice if a pipeline is >> > not using dynamic destinations the full graph is still generated, but much >> > of that graph is never used (empty PCollections). >> > >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw >> > wrote: >> >> >> >> I agree that this is concerning. Some of the complexity may have also >> >> been introduced to accommodate writing files in Streaming mode, but it >> >> seems we should be able to execute this as a single Map operation. >> >> >> >> Have you profiled to see which stages and/or operations are taking up the >> >> time? >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson >> >> wrote: >> >> > >> >> > Hi folks, >> >> > >> >> > I've recently been involved in projects rewriting Avro files and have >> >> > discovered a concerning performance trait in Beam. >> >> > >> >> > I have observed Beam between 6-20x slower than native Spark or >> >> > MapReduce code for a simple pipeline of read Avro, modify, write Avro. >> >> > >> >> > - Rewriting 200TB of Avro files (big cluster): 14 hrs using >> >> > Beam/Spark, 40 minutes with a map-only MR job >> >> > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, >> >> > 18 minutes using vanilla Spark code. Test code available [1] >> >> > >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark >> >> > / YARN) on reference Dell / Cloudera hardware. >> >> > >> >> > I have only just started exploring but I believe the cause is rooted in >> >> > the WriteFiles which is used by all our file based IO. WriteFiles is >> >> > reasonably complex with reshuffles, spilling to temporary files >> >> > (presumably to accommodate varying bundle sizes/avoid small files), a >> >> > union, a GBK etc. >> >> > >> >> > Before I go too far with exploration I'd appreciate thoughts on whether >> >> > we believe this is a concern (I do), if we should explore optimisations >> >> > or any insight from previous work in this area. >> >> > >> >> > Thanks, >> >> > Tim >> >> > >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
Re: [DISCUSS] Performance of write() in file based IO
Often only the metadata (i.e. temp file names) are shuffled, except in the "spilling" case (which should only happen when using dynamic destinations). WriteFiles depends heavily on side inputs. How are side inputs implemented in the Spark runner? On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw wrote: > Yes, I stand corrected, dynamic writes is now much more than the > primitive window-based naming we used to have. > > It would be interesting to visualize how much of this codepath is > metatada vs. the actual data. > > In the case of file writing, it seems one could (maybe?) avoid > requiring a stable input, as shards are accepted as a whole (unlike, > say, sinks where a deterministic uid is needed for deduplication on > retry). > > On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax wrote: > > > > Robert - much of the complexity isn't due to streaming, but rather > because WriteFiles supports "dynamic" output (where the user can choose a > destination file based on the input record). In practice if a pipeline is > not using dynamic destinations the full graph is still generated, but much > of that graph is never used (empty PCollections). > > > > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw > wrote: > >> > >> I agree that this is concerning. Some of the complexity may have also > >> been introduced to accommodate writing files in Streaming mode, but it > >> seems we should be able to execute this as a single Map operation. > >> > >> Have you profiled to see which stages and/or operations are taking up > the time? > >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson > >> wrote: > >> > > >> > Hi folks, > >> > > >> > I've recently been involved in projects rewriting Avro files and have > discovered a concerning performance trait in Beam. > >> > > >> > I have observed Beam between 6-20x slower than native Spark or > MapReduce code for a simple pipeline of read Avro, modify, write Avro. > >> > > >> > - Rewriting 200TB of Avro files (big cluster): 14 hrs using > Beam/Spark, 40 minutes with a map-only MR job > >> > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, > 18 minutes using vanilla Spark code. Test code available [1] > >> > > >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters > (Spark / YARN) on reference Dell / Cloudera hardware. > >> > > >> > I have only just started exploring but I believe the cause is rooted > in the WriteFiles which is used by all our file based IO. WriteFiles is > reasonably complex with reshuffles, spilling to temporary files (presumably > to accommodate varying bundle sizes/avoid small files), a union, a GBK etc. > >> > > >> > Before I go too far with exploration I'd appreciate thoughts on > whether we believe this is a concern (I do), if we should explore > optimisations or any insight from previous work in this area. > >> > > >> > Thanks, > >> > Tim > >> > > >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro >
Re: [DISCUSS] Performance of write() in file based IO
Yes, I stand corrected, dynamic writes is now much more than the primitive window-based naming we used to have. It would be interesting to visualize how much of this codepath is metatada vs. the actual data. In the case of file writing, it seems one could (maybe?) avoid requiring a stable input, as shards are accepted as a whole (unlike, say, sinks where a deterministic uid is needed for deduplication on retry). On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax wrote: > > Robert - much of the complexity isn't due to streaming, but rather because > WriteFiles supports "dynamic" output (where the user can choose a destination > file based on the input record). In practice if a pipeline is not using > dynamic destinations the full graph is still generated, but much of that > graph is never used (empty PCollections). > > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw wrote: >> >> I agree that this is concerning. Some of the complexity may have also >> been introduced to accommodate writing files in Streaming mode, but it >> seems we should be able to execute this as a single Map operation. >> >> Have you profiled to see which stages and/or operations are taking up the >> time? >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson >> wrote: >> > >> > Hi folks, >> > >> > I've recently been involved in projects rewriting Avro files and have >> > discovered a concerning performance trait in Beam. >> > >> > I have observed Beam between 6-20x slower than native Spark or MapReduce >> > code for a simple pipeline of read Avro, modify, write Avro. >> > >> > - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, >> > 40 minutes with a map-only MR job >> > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 >> > minutes using vanilla Spark code. Test code available [1] >> > >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / >> > YARN) on reference Dell / Cloudera hardware. >> > >> > I have only just started exploring but I believe the cause is rooted in >> > the WriteFiles which is used by all our file based IO. WriteFiles is >> > reasonably complex with reshuffles, spilling to temporary files >> > (presumably to accommodate varying bundle sizes/avoid small files), a >> > union, a GBK etc. >> > >> > Before I go too far with exploration I'd appreciate thoughts on whether we >> > believe this is a concern (I do), if we should explore optimisations or >> > any insight from previous work in this area. >> > >> > Thanks, >> > Tim >> > >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
Re: [DISCUSS] Performance of write() in file based IO
Robert - much of the complexity isn't due to streaming, but rather because WriteFiles supports "dynamic" output (where the user can choose a destination file based on the input record). In practice if a pipeline is not using dynamic destinations the full graph is still generated, but much of that graph is never used (empty PCollections). On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw wrote: > I agree that this is concerning. Some of the complexity may have also > been introduced to accommodate writing files in Streaming mode, but it > seems we should be able to execute this as a single Map operation. > > Have you profiled to see which stages and/or operations are taking up the > time? > On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson > wrote: > > > > Hi folks, > > > > I've recently been involved in projects rewriting Avro files and have > discovered a concerning performance trait in Beam. > > > > I have observed Beam between 6-20x slower than native Spark or MapReduce > code for a simple pipeline of read Avro, modify, write Avro. > > > > - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, > 40 minutes with a map-only MR job > > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 > minutes using vanilla Spark code. Test code available [1] > > > > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / > YARN) on reference Dell / Cloudera hardware. > > > > I have only just started exploring but I believe the cause is rooted in > the WriteFiles which is used by all our file based IO. WriteFiles is > reasonably complex with reshuffles, spilling to temporary files (presumably > to accommodate varying bundle sizes/avoid small files), a union, a GBK etc. > > > > Before I go too far with exploration I'd appreciate thoughts on whether > we believe this is a concern (I do), if we should explore optimisations or > any insight from previous work in this area. > > > > Thanks, > > Tim > > > > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro >
Re: [DISCUSS] Performance of write() in file based IO
I think we need dig in more to understand where the slowness is. Some context (which might not be obvious from the code): * Much of the complexity in WriteFiles is not always active. e.g. a lot of it is there to support dynamic output (where the filename is dynamically chosen based on the input record), and if you're not using dynamic output a lot of those codepaths will not be used. * There is some overhead because Beam does not assume that ParDos are deterministic (by contrast, Spark often assumes that user code is deterministic), and so inserts a shuffle to make sure that file writes are deterministic. I believe that the current Spark runner might checkpoint the entire RDD in such a case, which is quite inefficient. We should try on other runners to make sure that this issue is not specific to the Spark runner. * Spilling to temporary files is done to avoid workers crashing with out of memory. Beam attempts to write files straight out of the bundle (to avoid shuffling all the data and just shuffle filenames). However empirically when there are too many files we get large bundles and all the file write buffers cause the workers to start running out of memory; a solution is to reshuffle the data to distribute it. This will only happen if you are using windowed writes or dynamic destinations to write to dynamic locations, otherwise the spilled code path is never executed. On Wed, Aug 22, 2018 at 2:29 AM Tim Robertson wrote: > Hi folks, > > I've recently been involved in projects rewriting Avro files and have > discovered a concerning performance trait in Beam. > > I have observed Beam between 6-20x slower than native Spark or MapReduce > code for a simple pipeline of read Avro, modify, write Avro. > > - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, > 40 minutes with a map-only MR job > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 > minutes using vanilla Spark code. Test code available [1] > > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / > YARN) on reference Dell / Cloudera hardware. > > I have only just started exploring but I believe the cause is rooted in > the WriteFiles which is used by all our file based IO. WriteFiles is > reasonably complex with reshuffles, spilling to temporary files (presumably > to accommodate varying bundle sizes/avoid small files), a union, a GBK etc. > > Before I go too far with exploration I'd appreciate thoughts on whether we > believe this is a concern (I do), if we should explore optimisations or any > insight from previous work in this area. > > Thanks, > Tim > > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro >
Re: [DISCUSS] Performance of write() in file based IO
> Are we seeing similar discrepancies for Flink? I am not sure I'm afraid (no easy access to flink right now). I tried without success to get Apex runner going on Cloudera YARN for this today - I'll keep trying when time allows. I've updated the DAGs to show more detail: https://github.com/gbif/beam-perf/tree/master/avro-to-avro On Wed, Aug 22, 2018 at 1:41 PM Robert Bradshaw wrote: > That is quite the DAG... Are we seeing similar discrepancies for > Flink? (Trying to understand if this is Beam->Spark translation bloat, > or inherent to the WriteFiles transform itself.) > On Wed, Aug 22, 2018 at 1:35 PM Tim Robertson > wrote: > > > > Thanks Robert > > > > > Have you profiled to see which stages and/or operations are taking up > the time? > > > > Not yet. I'm browsing through the spark DAG produced which I've > committed [1] and reading the code. > > > > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro > > > > On Wed, Aug 22, 2018 at 12:12 PM Robert Bradshaw > wrote: > >> > >> I agree that this is concerning. Some of the complexity may have also > >> been introduced to accommodate writing files in Streaming mode, but it > >> seems we should be able to execute this as a single Map operation. > >> > >> Have you profiled to see which stages and/or operations are taking up > the time? > >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson > >> wrote: > >> > > >> > Hi folks, > >> > > >> > I've recently been involved in projects rewriting Avro files and have > discovered a concerning performance trait in Beam. > >> > > >> > I have observed Beam between 6-20x slower than native Spark or > MapReduce code for a simple pipeline of read Avro, modify, write Avro. > >> > > >> > - Rewriting 200TB of Avro files (big cluster): 14 hrs using > Beam/Spark, 40 minutes with a map-only MR job > >> > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, > 18 minutes using vanilla Spark code. Test code available [1] > >> > > >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters > (Spark / YARN) on reference Dell / Cloudera hardware. > >> > > >> > I have only just started exploring but I believe the cause is rooted > in the WriteFiles which is used by all our file based IO. WriteFiles is > reasonably complex with reshuffles, spilling to temporary files (presumably > to accommodate varying bundle sizes/avoid small files), a union, a GBK etc. > >> > > >> > Before I go too far with exploration I'd appreciate thoughts on > whether we believe this is a concern (I do), if we should explore > optimisations or any insight from previous work in this area. > >> > > >> > Thanks, > >> > Tim > >> > > >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro >
Re: [DISCUSS] Performance of write() in file based IO
That is quite the DAG... Are we seeing similar discrepancies for Flink? (Trying to understand if this is Beam->Spark translation bloat, or inherent to the WriteFiles transform itself.) On Wed, Aug 22, 2018 at 1:35 PM Tim Robertson wrote: > > Thanks Robert > > > Have you profiled to see which stages and/or operations are taking up the > > time? > > Not yet. I'm browsing through the spark DAG produced which I've committed [1] > and reading the code. > > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro > > On Wed, Aug 22, 2018 at 12:12 PM Robert Bradshaw wrote: >> >> I agree that this is concerning. Some of the complexity may have also >> been introduced to accommodate writing files in Streaming mode, but it >> seems we should be able to execute this as a single Map operation. >> >> Have you profiled to see which stages and/or operations are taking up the >> time? >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson >> wrote: >> > >> > Hi folks, >> > >> > I've recently been involved in projects rewriting Avro files and have >> > discovered a concerning performance trait in Beam. >> > >> > I have observed Beam between 6-20x slower than native Spark or MapReduce >> > code for a simple pipeline of read Avro, modify, write Avro. >> > >> > - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, >> > 40 minutes with a map-only MR job >> > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 >> > minutes using vanilla Spark code. Test code available [1] >> > >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / >> > YARN) on reference Dell / Cloudera hardware. >> > >> > I have only just started exploring but I believe the cause is rooted in >> > the WriteFiles which is used by all our file based IO. WriteFiles is >> > reasonably complex with reshuffles, spilling to temporary files >> > (presumably to accommodate varying bundle sizes/avoid small files), a >> > union, a GBK etc. >> > >> > Before I go too far with exploration I'd appreciate thoughts on whether we >> > believe this is a concern (I do), if we should explore optimisations or >> > any insight from previous work in this area. >> > >> > Thanks, >> > Tim >> > >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
Re: [DISCUSS] Performance of write() in file based IO
Thanks Robert > Have you profiled to see which stages and/or operations are taking up the time? Not yet. I'm browsing through the spark DAG produced which I've committed [1] and reading the code. [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro On Wed, Aug 22, 2018 at 12:12 PM Robert Bradshaw wrote: > I agree that this is concerning. Some of the complexity may have also > been introduced to accommodate writing files in Streaming mode, but it > seems we should be able to execute this as a single Map operation. > > Have you profiled to see which stages and/or operations are taking up the > time? > On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson > wrote: > > > > Hi folks, > > > > I've recently been involved in projects rewriting Avro files and have > discovered a concerning performance trait in Beam. > > > > I have observed Beam between 6-20x slower than native Spark or MapReduce > code for a simple pipeline of read Avro, modify, write Avro. > > > > - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, > 40 minutes with a map-only MR job > > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 > minutes using vanilla Spark code. Test code available [1] > > > > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / > YARN) on reference Dell / Cloudera hardware. > > > > I have only just started exploring but I believe the cause is rooted in > the WriteFiles which is used by all our file based IO. WriteFiles is > reasonably complex with reshuffles, spilling to temporary files (presumably > to accommodate varying bundle sizes/avoid small files), a union, a GBK etc. > > > > Before I go too far with exploration I'd appreciate thoughts on whether > we believe this is a concern (I do), if we should explore optimisations or > any insight from previous work in this area. > > > > Thanks, > > Tim > > > > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro >
Re: [DISCUSS] Performance of write() in file based IO
I agree that this is concerning. Some of the complexity may have also been introduced to accommodate writing files in Streaming mode, but it seems we should be able to execute this as a single Map operation. Have you profiled to see which stages and/or operations are taking up the time? On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson wrote: > > Hi folks, > > I've recently been involved in projects rewriting Avro files and have > discovered a concerning performance trait in Beam. > > I have observed Beam between 6-20x slower than native Spark or MapReduce code > for a simple pipeline of read Avro, modify, write Avro. > > - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 40 > minutes with a map-only MR job > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 > minutes using vanilla Spark code. Test code available [1] > > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / > YARN) on reference Dell / Cloudera hardware. > > I have only just started exploring but I believe the cause is rooted in the > WriteFiles which is used by all our file based IO. WriteFiles is reasonably > complex with reshuffles, spilling to temporary files (presumably to > accommodate varying bundle sizes/avoid small files), a union, a GBK etc. > > Before I go too far with exploration I'd appreciate thoughts on whether we > believe this is a concern (I do), if we should explore optimisations or any > insight from previous work in this area. > > Thanks, > Tim > > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro