The DAG formatted strangely, the intended formatting was
PCollectionA -> PCollectionB -> PCollectionC
\-> PCollectionD
Jan
On 5/15/19 11:19 AM, Jan Lukavský wrote:
Hi,
I think this thread is another manifestation of a problem discussed
recently in [1]. Long story short - users in certain situations might
legitimately need finer control over how is their pipeline translated
into runner's operators. The case of caching is another one, where
looking at the pipeline itself doesn't contain enough information to
perform correct optimization - consider for example this DAG of
operations:
PCollectionA -> PCollectionB -> PCollectionC
\-> PCollectionD
That is PCollectionB is consumed by two operators - PCollectionC and
PCollectionD. Logical conclusion would be that we need to cache
PCollectionB, but what if the transform that produced PCollectionB is
of type "cheap explosion" - where PCollectionB is significantly bigger
than PCollectionA and at the same time can be produced very cheaply
from elements of PCollectionA? Than it would make sense to cache
PCollectionA instead. But you cannot know that just from the DAG.
There would be many more examples of this. Maybe we could think about
how to support this, which might help widen the user base.
Jan
[1] https://www.mail-archive.com/user@beam.apache.org/msg03809.html
On 5/15/19 10:39 AM, Robert Bradshaw wrote:
Just to clarify, do you need direct control over what to cache, or
would it be OK to let Spark decide the minimal set of RDDs to cache as
long as we didn't cache all intermediates?
From: Augusto Ribeiro <augusto....@gmail.com>
Date: Wed, May 15, 2019 at 8:37 AM
To: <user@beam.apache.org>
Hi Kyle,
Thanks for the help. It seems like I have no other choice than using
Spark directly, since my job causes immense memory pressure if I
can't decide what to cache.
Best regards,
Augusto
On 14 May 2019, at 18:40, Kyle Weaver <kcwea...@google.com> wrote:
Minor correction: Slack channel is actually #beam-spark
Kyle Weaver | Software Engineer | github.com/ibzib |
kcwea...@google.com | +16502035555
From: Kyle Weaver <kcwea...@google.com>
Date: Tue, May 14, 2019 at 9:38 AM
To: <user@beam.apache.org>
Hi Augusto,
Right now the default behavior is to cache all intermediate RDDs
that are consumed more than once by the pipeline. This can be
disabled with `options.setCacheDisabled(true)` [1], but there is
currently no way for the user to specify to the runner that it
should cache certain RDDs, but not others.
There has recently been some discussion on the Slack (#spark-beam)
about implementing such a feature, but no concrete plans as of yet.
[1]
https://github.com/apache/beam/blob/81faf35c8a42493317eba9fa1e7b06fb42d54662/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java#L150
Thanks
Kyle Weaver | Software Engineer | github.com/ibzib |
kcwea...@google.com | +16502035555
From: augusto....@gmail.com <augusto....@gmail.com>
Date: Tue, May 14, 2019 at 5:01 AM
To: <user@beam.apache.org>
Hi,
I guess the title says it all, right now it seems like BEAM caches
all the intermediate RDD results for my pipeline when using the
Spark runner, this leads to a very inefficient usage of memory.
Any way to control this?
Best regards,
Augusto