Reuven, I think you might be on to something

The Beam HadoopFileSystem copy() does indeed stream through the driver [1],
and the FileBasedSink.moveToOutputFiles() seemingly uses that method [2].
I'll cobble together a patched version to test using a rename() rather than
a copy() and report back findings before we consider the implications.

Thanks

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288

On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson <timrobertson...@gmail.com>
wrote:

> > Does HDFS support a fast rename operation?
>
> Yes. From the shell it is “mv” and in the Java API it is “rename(Path src,
> Path dst)”.
> I am not aware of a fast copy though. I think an HDFS copy streams the
> bytes through the driver (unless a distcp is issued which is a MR job).
>
> (Thanks for engaging in this discussion folks)
>
>
> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax <re...@google.com> wrote:
>
>> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
>> temporary files to the final destination and then delete the temp files.
>> Does HDFS support a fast rename operation? If so, I bet Spark is using that
>> instead of paying the cost of copying the files.
>>
>> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Ismael, that should already be true. If not using dynamic destinations
>>> there might be some edges in the graph that are never used (i.e. no records
>>> are ever published on them), but that should not affect performance. If
>>> this is not the case we should fix it.
>>>
>>> Reuven
>>>
>>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía <ieme...@gmail.com> wrote:
>>>
>>>> Spark runner uses the Spark broadcast mechanism to materialize the
>>>> side input PCollections in the workers, not sure exactly if this is
>>>> efficient assigned in an optimal way but seems logical at least.
>>>>
>>>> Just wondering if we shouldn't better first tackle the fact that if
>>>> the pipeline does not have dynamic destinations (this case) WriteFiles
>>>> should not be doing so much extra magic?
>>>>
>>>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote:
>>>> >
>>>> > Often only the metadata (i.e. temp file names) are shuffled, except
>>>> in the "spilling" case (which should only happen when using dynamic
>>>> destinations).
>>>> >
>>>> > WriteFiles depends heavily on side inputs. How are side inputs
>>>> implemented in the Spark runner?
>>>> >
>>>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>> >>
>>>> >> Yes, I stand corrected, dynamic writes is now much more than the
>>>> >> primitive window-based naming we used to have.
>>>> >>
>>>> >> It would be interesting to visualize how much of this codepath is
>>>> >> metatada vs. the actual data.
>>>> >>
>>>> >> In the case of file writing, it seems one could (maybe?) avoid
>>>> >> requiring a stable input, as shards are accepted as a whole (unlike,
>>>> >> say, sinks where a deterministic uid is needed for deduplication on
>>>> >> retry).
>>>> >>
>>>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com> wrote:
>>>> >> >
>>>> >> > Robert - much of the complexity isn't due to streaming, but rather
>>>> because WriteFiles supports "dynamic" output (where the user can choose a
>>>> destination file based on the input record). In practice if a pipeline is
>>>> not using dynamic destinations the full graph is still generated, but much
>>>> of that graph is never used (empty PCollections).
>>>> >> >
>>>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <
>>>> rober...@google.com> wrote:
>>>> >> >>
>>>> >> >> I agree that this is concerning. Some of the complexity may have
>>>> also
>>>> >> >> been introduced to accommodate writing files in Streaming mode,
>>>> but it
>>>> >> >> seems we should be able to execute this as a single Map operation.
>>>> >> >>
>>>> >> >> Have you profiled to see which stages and/or operations are
>>>> taking up the time?
>>>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>>>> >> >> <timrobertson...@gmail.com> wrote:
>>>> >> >> >
>>>> >> >> > Hi folks,
>>>> >> >> >
>>>> >> >> > I've recently been involved in projects rewriting Avro files
>>>> and have discovered a concerning performance trait in Beam.
>>>> >> >> >
>>>> >> >> > I have observed Beam between 6-20x slower than native Spark or
>>>> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>>>> >> >> >
>>>> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
>>>> Beam/Spark, 40 minutes with a map-only MR job
>>>> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
>>>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>>>> >> >> >
>>>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
>>>> (Spark / YARN) on reference Dell / Cloudera hardware.
>>>> >> >> >
>>>> >> >> > I have only just started exploring but I believe the cause is
>>>> rooted in the WriteFiles which is used by all our file based IO. WriteFiles
>>>> is reasonably complex with reshuffles, spilling to temporary files
>>>> (presumably to accommodate varying bundle sizes/avoid small files), a
>>>> union, a GBK etc.
>>>> >> >> >
>>>> >> >> > Before I go too far with exploration I'd appreciate thoughts on
>>>> whether we believe this is a concern (I do), if we should explore
>>>> optimisations or any insight from previous work in this area.
>>>> >> >> >
>>>> >> >> > Thanks,
>>>> >> >> > Tim
>>>> >> >> >
>>>> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>>>>
>>>

Reply via email to