The DAG formatted strangely, the intended formatting was

PCollectionA -> PCollectionB -> PCollectionC

                            \-> PCollectionD

Jan

On 5/15/19 11:19 AM, Jan Lukavský wrote:
Hi,

I think this thread is another manifestation of a problem discussed recently in [1]. Long story short - users in certain situations might legitimately need finer control over how is their pipeline translated into runner's operators. The case of caching is another one, where looking at the pipeline itself doesn't contain enough information to perform correct optimization - consider for example this DAG of operations:

 PCollectionA -> PCollectionB -> PCollectionC

                                              \-> PCollectionD

That is PCollectionB is consumed by two operators - PCollectionC and PCollectionD. Logical conclusion would be that we need to cache PCollectionB, but what if the transform that produced PCollectionB is of type "cheap explosion" - where PCollectionB is significantly bigger than PCollectionA and at the same time can be produced very cheaply from elements of PCollectionA? Than it would make sense to cache PCollectionA instead. But you cannot know that just from the DAG. There would be many more examples of this. Maybe we could think about how to support this, which might help widen the user base.

Jan

[1] https://www.mail-archive.com/user@beam.apache.org/msg03809.html

On 5/15/19 10:39 AM, Robert Bradshaw wrote:
Just to clarify, do you need direct control over what to cache, or
would it be OK to let Spark decide the minimal set of RDDs to cache as
long as we didn't cache all intermediates?

From: Augusto Ribeiro <augusto....@gmail.com>
Date: Wed, May 15, 2019 at 8:37 AM
To: <user@beam.apache.org>

Hi Kyle,

Thanks for the help. It seems like I have no other choice than using Spark directly, since my job causes immense memory pressure if I can't decide what to cache.

Best regards,
Augusto

On 14 May 2019, at 18:40, Kyle Weaver <kcwea...@google.com> wrote:

Minor correction: Slack channel is actually #beam-spark

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com | +16502035555


From: Kyle Weaver <kcwea...@google.com>
Date: Tue, May 14, 2019 at 9:38 AM
To: <user@beam.apache.org>

Hi Augusto,

Right now the default behavior is to cache all intermediate RDDs that are consumed more than once by the pipeline. This can be disabled with `options.setCacheDisabled(true)` [1], but there is currently no way for the user to specify to the runner that it should cache certain RDDs, but not others.

There has recently been some discussion on the Slack (#spark-beam) about implementing such a feature, but no concrete plans as of yet.

[1] https://github.com/apache/beam/blob/81faf35c8a42493317eba9fa1e7b06fb42d54662/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java#L150

Thanks

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com | +16502035555


From: augusto....@gmail.com <augusto....@gmail.com>
Date: Tue, May 14, 2019 at 5:01 AM
To: <user@beam.apache.org>

Hi,

I guess the title says it all, right now it seems like BEAM caches all the intermediate RDD results for my pipeline when using the Spark runner, this leads to a very inefficient usage of memory. Any way to control this?

Best regards,
Augusto

Reply via email to