Just for reference, there is a JIRA open for FileBasedSink.moveToOutputFiles() and filesystem move behavior
https://issues.apache.org/jira/browse/BEAM-5036 On Wed, Aug 22, 2018 at 9:15 PM Tim Robertson <timrobertson...@gmail.com> wrote: > Reuven, I think you might be on to something > > The Beam HadoopFileSystem copy() does indeed stream through the driver > [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method > [2]. > I'll cobble together a patched version to test using a rename() rather > than a copy() and report back findings before we consider the implications. > > Thanks > > [1] > https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124 > [2] > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288 > > On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson <timrobertson...@gmail.com> > wrote: > >> > Does HDFS support a fast rename operation? >> >> Yes. From the shell it is “mv” and in the Java API it is “rename(Path >> src, Path dst)”. >> I am not aware of a fast copy though. I think an HDFS copy streams the >> bytes through the driver (unless a distcp is issued which is a MR job). >> >> (Thanks for engaging in this discussion folks) >> >> >> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax <re...@google.com> wrote: >> >>> I have another theory: in FileBasedSink.moveToOutputFiles we copy the >>> temporary files to the final destination and then delete the temp files. >>> Does HDFS support a fast rename operation? If so, I bet Spark is using that >>> instead of paying the cost of copying the files. >>> >>> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax <re...@google.com> wrote: >>> >>>> Ismael, that should already be true. If not using dynamic destinations >>>> there might be some edges in the graph that are never used (i.e. no records >>>> are ever published on them), but that should not affect performance. If >>>> this is not the case we should fix it. >>>> >>>> Reuven >>>> >>>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía <ieme...@gmail.com> wrote: >>>> >>>>> Spark runner uses the Spark broadcast mechanism to materialize the >>>>> side input PCollections in the workers, not sure exactly if this is >>>>> efficient assigned in an optimal way but seems logical at least. >>>>> >>>>> Just wondering if we shouldn't better first tackle the fact that if >>>>> the pipeline does not have dynamic destinations (this case) WriteFiles >>>>> should not be doing so much extra magic? >>>>> >>>>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote: >>>>> > >>>>> > Often only the metadata (i.e. temp file names) are shuffled, except >>>>> in the "spilling" case (which should only happen when using dynamic >>>>> destinations). >>>>> > >>>>> > WriteFiles depends heavily on side inputs. How are side inputs >>>>> implemented in the Spark runner? >>>>> > >>>>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>> >> >>>>> >> Yes, I stand corrected, dynamic writes is now much more than the >>>>> >> primitive window-based naming we used to have. >>>>> >> >>>>> >> It would be interesting to visualize how much of this codepath is >>>>> >> metatada vs. the actual data. >>>>> >> >>>>> >> In the case of file writing, it seems one could (maybe?) avoid >>>>> >> requiring a stable input, as shards are accepted as a whole (unlike, >>>>> >> say, sinks where a deterministic uid is needed for deduplication on >>>>> >> retry). >>>>> >> >>>>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com> >>>>> wrote: >>>>> >> > >>>>> >> > Robert - much of the complexity isn't due to streaming, but >>>>> rather because WriteFiles supports "dynamic" output (where the user can >>>>> choose a destination file based on the input record). In practice if a >>>>> pipeline is not using dynamic destinations the full graph is still >>>>> generated, but much of that graph is never used (empty PCollections). >>>>> >> > >>>>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw < >>>>> rober...@google.com> wrote: >>>>> >> >> >>>>> >> >> I agree that this is concerning. Some of the complexity may have >>>>> also >>>>> >> >> been introduced to accommodate writing files in Streaming mode, >>>>> but it >>>>> >> >> seems we should be able to execute this as a single Map >>>>> operation. >>>>> >> >> >>>>> >> >> Have you profiled to see which stages and/or operations are >>>>> taking up the time? >>>>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson >>>>> >> >> <timrobertson...@gmail.com> wrote: >>>>> >> >> > >>>>> >> >> > Hi folks, >>>>> >> >> > >>>>> >> >> > I've recently been involved in projects rewriting Avro files >>>>> and have discovered a concerning performance trait in Beam. >>>>> >> >> > >>>>> >> >> > I have observed Beam between 6-20x slower than native Spark or >>>>> MapReduce code for a simple pipeline of read Avro, modify, write Avro. >>>>> >> >> > >>>>> >> >> > - Rewriting 200TB of Avro files (big cluster): 14 hrs using >>>>> Beam/Spark, 40 minutes with a map-only MR job >>>>> >> >> > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using >>>>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1] >>>>> >> >> > >>>>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x >>>>> clusters (Spark / YARN) on reference Dell / Cloudera hardware. >>>>> >> >> > >>>>> >> >> > I have only just started exploring but I believe the cause is >>>>> rooted in the WriteFiles which is used by all our file based IO. >>>>> WriteFiles >>>>> is reasonably complex with reshuffles, spilling to temporary files >>>>> (presumably to accommodate varying bundle sizes/avoid small files), a >>>>> union, a GBK etc. >>>>> >> >> > >>>>> >> >> > Before I go too far with exploration I'd appreciate thoughts >>>>> on whether we believe this is a concern (I do), if we should explore >>>>> optimisations or any insight from previous work in this area. >>>>> >> >> > >>>>> >> >> > Thanks, >>>>> >> >> > Tim >>>>> >> >> > >>>>> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro >>>>> >>>>