I think we need dig in more to understand where the slowness is. Some
context (which might not be obvious from the code):

* Much of the complexity in WriteFiles is not always active. e.g. a lot of
it is there to support dynamic output (where the filename is dynamically
chosen based on the input record), and if you're not using dynamic output a
lot of those codepaths will not be used.

* There is some overhead because Beam does not assume that ParDos are
deterministic (by contrast, Spark often assumes that user code is
deterministic), and so inserts a shuffle to make sure that file writes are
deterministic. I believe that the current Spark runner might checkpoint the
entire RDD in such a case, which is quite inefficient. We should try on
other runners to make sure that this issue is not specific to the Spark
runner.

* Spilling to temporary files is done to avoid workers crashing with out of
memory. Beam attempts to write files straight out of the bundle (to avoid
shuffling all the data and just shuffle filenames). However empirically
when there are too many files we get large bundles and all the file write
buffers cause the workers to start running out of memory; a solution is to
reshuffle the data to distribute it. This will only happen if you are using
windowed writes or dynamic destinations to write to dynamic locations,
otherwise the spilled code path is never executed.

On Wed, Aug 22, 2018 at 2:29 AM Tim Robertson <timrobertson...@gmail.com>
wrote:

> Hi folks,
>
> I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
>
> I have observed Beam between 6-20x slower than native Spark or MapReduce
> code for a simple pipeline of read Avro, modify, write Avro.
>
>  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark,
> 40 minutes with a map-only MR job
>  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18
> minutes using vanilla Spark code. Test code available [1]
>
> These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark /
> YARN) on reference Dell / Cloudera hardware.
>
> I have only just started exploring but I believe the cause is rooted in
> the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
>
> Before I go too far with exploration I'd appreciate thoughts on whether we
> believe this is a concern (I do), if we should explore optimisations or any
> insight from previous work in this area.
>
> Thanks,
> Tim
>
> [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>

Reply via email to