Just for reference, there is a JIRA open for
FileBasedSink.moveToOutputFiles()  and filesystem move behavior

https://issues.apache.org/jira/browse/BEAM-5036


On Wed, Aug 22, 2018 at 9:15 PM Tim Robertson <timrobertson...@gmail.com>
wrote:

> Reuven, I think you might be on to something
>
> The Beam HadoopFileSystem copy() does indeed stream through the driver
> [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method
> [2].
> I'll cobble together a patched version to test using a rename() rather
> than a copy() and report back findings before we consider the implications.
>
> Thanks
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288
>
> On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson <timrobertson...@gmail.com>
> wrote:
>
>> > Does HDFS support a fast rename operation?
>>
>> Yes. From the shell it is “mv” and in the Java API it is “rename(Path
>> src, Path dst)”.
>> I am not aware of a fast copy though. I think an HDFS copy streams the
>> bytes through the driver (unless a distcp is issued which is a MR job).
>>
>> (Thanks for engaging in this discussion folks)
>>
>>
>> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax <re...@google.com> wrote:
>>
>>> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
>>> temporary files to the final destination and then delete the temp files.
>>> Does HDFS support a fast rename operation? If so, I bet Spark is using that
>>> instead of paying the cost of copying the files.
>>>
>>> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Ismael, that should already be true. If not using dynamic destinations
>>>> there might be some edges in the graph that are never used (i.e. no records
>>>> are ever published on them), but that should not affect performance. If
>>>> this is not the case we should fix it.
>>>>
>>>> Reuven
>>>>
>>>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía <ieme...@gmail.com> wrote:
>>>>
>>>>> Spark runner uses the Spark broadcast mechanism to materialize the
>>>>> side input PCollections in the workers, not sure exactly if this is
>>>>> efficient assigned in an optimal way but seems logical at least.
>>>>>
>>>>> Just wondering if we shouldn't better first tackle the fact that if
>>>>> the pipeline does not have dynamic destinations (this case) WriteFiles
>>>>> should not be doing so much extra magic?
>>>>>
>>>>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote:
>>>>> >
>>>>> > Often only the metadata (i.e. temp file names) are shuffled, except
>>>>> in the "spilling" case (which should only happen when using dynamic
>>>>> destinations).
>>>>> >
>>>>> > WriteFiles depends heavily on side inputs. How are side inputs
>>>>> implemented in the Spark runner?
>>>>> >
>>>>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>> >>
>>>>> >> Yes, I stand corrected, dynamic writes is now much more than the
>>>>> >> primitive window-based naming we used to have.
>>>>> >>
>>>>> >> It would be interesting to visualize how much of this codepath is
>>>>> >> metatada vs. the actual data.
>>>>> >>
>>>>> >> In the case of file writing, it seems one could (maybe?) avoid
>>>>> >> requiring a stable input, as shards are accepted as a whole (unlike,
>>>>> >> say, sinks where a deterministic uid is needed for deduplication on
>>>>> >> retry).
>>>>> >>
>>>>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com>
>>>>> wrote:
>>>>> >> >
>>>>> >> > Robert - much of the complexity isn't due to streaming, but
>>>>> rather because WriteFiles supports "dynamic" output (where the user can
>>>>> choose a destination file based on the input record). In practice if a
>>>>> pipeline is not using dynamic destinations the full graph is still
>>>>> generated, but much of that graph is never used (empty PCollections).
>>>>> >> >
>>>>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <
>>>>> rober...@google.com> wrote:
>>>>> >> >>
>>>>> >> >> I agree that this is concerning. Some of the complexity may have
>>>>> also
>>>>> >> >> been introduced to accommodate writing files in Streaming mode,
>>>>> but it
>>>>> >> >> seems we should be able to execute this as a single Map
>>>>> operation.
>>>>> >> >>
>>>>> >> >> Have you profiled to see which stages and/or operations are
>>>>> taking up the time?
>>>>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>>>>> >> >> <timrobertson...@gmail.com> wrote:
>>>>> >> >> >
>>>>> >> >> > Hi folks,
>>>>> >> >> >
>>>>> >> >> > I've recently been involved in projects rewriting Avro files
>>>>> and have discovered a concerning performance trait in Beam.
>>>>> >> >> >
>>>>> >> >> > I have observed Beam between 6-20x slower than native Spark or
>>>>> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>>>>> >> >> >
>>>>> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
>>>>> Beam/Spark, 40 minutes with a map-only MR job
>>>>> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
>>>>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>>>>> >> >> >
>>>>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x
>>>>> clusters (Spark / YARN) on reference Dell / Cloudera hardware.
>>>>> >> >> >
>>>>> >> >> > I have only just started exploring but I believe the cause is
>>>>> rooted in the WriteFiles which is used by all our file based IO. 
>>>>> WriteFiles
>>>>> is reasonably complex with reshuffles, spilling to temporary files
>>>>> (presumably to accommodate varying bundle sizes/avoid small files), a
>>>>> union, a GBK etc.
>>>>> >> >> >
>>>>> >> >> > Before I go too far with exploration I'd appreciate thoughts
>>>>> on whether we believe this is a concern (I do), if we should explore
>>>>> optimisations or any insight from previous work in this area.
>>>>> >> >> >
>>>>> >> >> > Thanks,
>>>>> >> >> > Tim
>>>>> >> >> >
>>>>> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>>>>>
>>>>

Reply via email to