I have another theory: in FileBasedSink.moveToOutputFiles we copy the temporary files to the final destination and then delete the temp files. Does HDFS support a fast rename operation? If so, I bet Spark is using that instead of paying the cost of copying the files.
On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax <re...@google.com> wrote: > Ismael, that should already be true. If not using dynamic destinations > there might be some edges in the graph that are never used (i.e. no records > are ever published on them), but that should not affect performance. If > this is not the case we should fix it. > > Reuven > > On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía <ieme...@gmail.com> wrote: > >> Spark runner uses the Spark broadcast mechanism to materialize the >> side input PCollections in the workers, not sure exactly if this is >> efficient assigned in an optimal way but seems logical at least. >> >> Just wondering if we shouldn't better first tackle the fact that if >> the pipeline does not have dynamic destinations (this case) WriteFiles >> should not be doing so much extra magic? >> >> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote: >> > >> > Often only the metadata (i.e. temp file names) are shuffled, except in >> the "spilling" case (which should only happen when using dynamic >> destinations). >> > >> > WriteFiles depends heavily on side inputs. How are side inputs >> implemented in the Spark runner? >> > >> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <rober...@google.com> >> wrote: >> >> >> >> Yes, I stand corrected, dynamic writes is now much more than the >> >> primitive window-based naming we used to have. >> >> >> >> It would be interesting to visualize how much of this codepath is >> >> metatada vs. the actual data. >> >> >> >> In the case of file writing, it seems one could (maybe?) avoid >> >> requiring a stable input, as shards are accepted as a whole (unlike, >> >> say, sinks where a deterministic uid is needed for deduplication on >> >> retry). >> >> >> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com> wrote: >> >> > >> >> > Robert - much of the complexity isn't due to streaming, but rather >> because WriteFiles supports "dynamic" output (where the user can choose a >> destination file based on the input record). In practice if a pipeline is >> not using dynamic destinations the full graph is still generated, but much >> of that graph is never used (empty PCollections). >> >> > >> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <rober...@google.com> >> wrote: >> >> >> >> >> >> I agree that this is concerning. Some of the complexity may have >> also >> >> >> been introduced to accommodate writing files in Streaming mode, but >> it >> >> >> seems we should be able to execute this as a single Map operation. >> >> >> >> >> >> Have you profiled to see which stages and/or operations are taking >> up the time? >> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson >> >> >> <timrobertson...@gmail.com> wrote: >> >> >> > >> >> >> > Hi folks, >> >> >> > >> >> >> > I've recently been involved in projects rewriting Avro files and >> have discovered a concerning performance trait in Beam. >> >> >> > >> >> >> > I have observed Beam between 6-20x slower than native Spark or >> MapReduce code for a simple pipeline of read Avro, modify, write Avro. >> >> >> > >> >> >> > - Rewriting 200TB of Avro files (big cluster): 14 hrs using >> Beam/Spark, 40 minutes with a map-only MR job >> >> >> > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using >> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1] >> >> >> > >> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters >> (Spark / YARN) on reference Dell / Cloudera hardware. >> >> >> > >> >> >> > I have only just started exploring but I believe the cause is >> rooted in the WriteFiles which is used by all our file based IO. WriteFiles >> is reasonably complex with reshuffles, spilling to temporary files >> (presumably to accommodate varying bundle sizes/avoid small files), a >> union, a GBK etc. >> >> >> > >> >> >> > Before I go too far with exploration I'd appreciate thoughts on >> whether we believe this is a concern (I do), if we should explore >> optimisations or any insight from previous work in this area. >> >> >> > >> >> >> > Thanks, >> >> >> > Tim >> >> >> > >> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro >> >