> Does HDFS support a fast rename operation? Yes. From the shell it is “mv” and in the Java API it is “rename(Path src, Path dst)”. I am not aware of a fast copy though. I think an HDFS copy streams the bytes through the driver (unless a distcp is issued which is a MR job).
(Thanks for engaging in this discussion folks) On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax <re...@google.com> wrote: > I have another theory: in FileBasedSink.moveToOutputFiles we copy the > temporary files to the final destination and then delete the temp files. > Does HDFS support a fast rename operation? If so, I bet Spark is using that > instead of paying the cost of copying the files. > > On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax <re...@google.com> wrote: > >> Ismael, that should already be true. If not using dynamic destinations >> there might be some edges in the graph that are never used (i.e. no records >> are ever published on them), but that should not affect performance. If >> this is not the case we should fix it. >> >> Reuven >> >> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía <ieme...@gmail.com> wrote: >> >>> Spark runner uses the Spark broadcast mechanism to materialize the >>> side input PCollections in the workers, not sure exactly if this is >>> efficient assigned in an optimal way but seems logical at least. >>> >>> Just wondering if we shouldn't better first tackle the fact that if >>> the pipeline does not have dynamic destinations (this case) WriteFiles >>> should not be doing so much extra magic? >>> >>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote: >>> > >>> > Often only the metadata (i.e. temp file names) are shuffled, except in >>> the "spilling" case (which should only happen when using dynamic >>> destinations). >>> > >>> > WriteFiles depends heavily on side inputs. How are side inputs >>> implemented in the Spark runner? >>> > >>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >> >>> >> Yes, I stand corrected, dynamic writes is now much more than the >>> >> primitive window-based naming we used to have. >>> >> >>> >> It would be interesting to visualize how much of this codepath is >>> >> metatada vs. the actual data. >>> >> >>> >> In the case of file writing, it seems one could (maybe?) avoid >>> >> requiring a stable input, as shards are accepted as a whole (unlike, >>> >> say, sinks where a deterministic uid is needed for deduplication on >>> >> retry). >>> >> >>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com> wrote: >>> >> > >>> >> > Robert - much of the complexity isn't due to streaming, but rather >>> because WriteFiles supports "dynamic" output (where the user can choose a >>> destination file based on the input record). In practice if a pipeline is >>> not using dynamic destinations the full graph is still generated, but much >>> of that graph is never used (empty PCollections). >>> >> > >>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw < >>> rober...@google.com> wrote: >>> >> >> >>> >> >> I agree that this is concerning. Some of the complexity may have >>> also >>> >> >> been introduced to accommodate writing files in Streaming mode, >>> but it >>> >> >> seems we should be able to execute this as a single Map operation. >>> >> >> >>> >> >> Have you profiled to see which stages and/or operations are taking >>> up the time? >>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson >>> >> >> <timrobertson...@gmail.com> wrote: >>> >> >> > >>> >> >> > Hi folks, >>> >> >> > >>> >> >> > I've recently been involved in projects rewriting Avro files and >>> have discovered a concerning performance trait in Beam. >>> >> >> > >>> >> >> > I have observed Beam between 6-20x slower than native Spark or >>> MapReduce code for a simple pipeline of read Avro, modify, write Avro. >>> >> >> > >>> >> >> > - Rewriting 200TB of Avro files (big cluster): 14 hrs using >>> Beam/Spark, 40 minutes with a map-only MR job >>> >> >> > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using >>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1] >>> >> >> > >>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters >>> (Spark / YARN) on reference Dell / Cloudera hardware. >>> >> >> > >>> >> >> > I have only just started exploring but I believe the cause is >>> rooted in the WriteFiles which is used by all our file based IO. WriteFiles >>> is reasonably complex with reshuffles, spilling to temporary files >>> (presumably to accommodate varying bundle sizes/avoid small files), a >>> union, a GBK etc. >>> >> >> > >>> >> >> > Before I go too far with exploration I'd appreciate thoughts on >>> whether we believe this is a concern (I do), if we should explore >>> optimisations or any insight from previous work in this area. >>> >> >> > >>> >> >> > Thanks, >>> >> >> > Tim >>> >> >> > >>> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro >>> >>