We do have 2.15.0 Beam batch jobs running on Spark runner. I did have a bit
of tricky time with spark.default.parallelism, but at the end it works fine
for us (custom parallelism on source stages and spark.default.parallelism
on all other stages after shuffles)

Tricky part in my case was interaction between `spark.default.parallelism`
and `beam.bundleSize`. I had a problem that default parallelism was
enforced on inputs too, therefore splitting them too much or too little.
Configuring bundleSize and custom config on inputs (e.g. hadoop input
format max/min split size) did the trick. TransformTranslator does make a
decision on parishioner based on bundleSize, however I am not sure how it
is later on used:
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java#L571

On Thu, Oct 3, 2019 at 9:25 AM Tim Robertson <[email protected]>
wrote:

> Hi all,
>
> We haven't dug enough into this to know where to log issues, but I'll
> start by sharing here.
>
> After upgrading from Beam 2.10.0 to 2.15.0 we see issues on SparkRunner -
> we suspect all of this related.
>
> 1. spark.default.parallelism is not respected
>
> 2. File writing (Avro) with dynamic destinations (grouped into folders by
> a field name) consistently fail with
> org.apache.beam.sdk.util.UserCodeException:
> java.nio.file.FileAlreadyExistsException: Unable to rename resource
> hdfs://ha-nn/pipelines/export-20190930-0854/.temp-beam-d4fd89ed-fc7a-4b1e-aceb-68f9d72d50f0/6e086f60-8bda-4d0e-b29d-1b47fdfc88c0
> to
> hdfs://ha-nn/pipelines/export-20190930-0854/7c9d2aec-f762-11e1-a439-00145eb45e9a/verbatimHBaseExport-00000-of-00001.avro
> as destination already exists and couldn't be deleted.
>
> 3. GBK operations that run over 500M small records consistently fail with
> OOM. We tried different configs with 48GB, 60GB, 80GB executor memory
>
> Our pipelines run are batch, simple transformations with either an
> HBaseSnapshot to Avro files or a merge of records in Avro (the GBK issue)
> pushed to ElasticSearch (it fails upstream of the ElasticsearchIO in the
> GBK stage).
>
> We notice operations that were mapToPair  in 2.10.0 become repartition
> operations ( (mapToPair at GroupCombineFunctions.java:68 becomes
> repartition at GroupCombineFunctions.java:202)) which might be related to
> this and looks surprising.
>
> I'll report more as we learn. If anyone has any immediate ideas based on
> their commits or reviews or if you wish an tests run on other Beam versions
> please say.
>
> Thanks,
> Tim
>
>
>
>

Reply via email to