Re: Problem with gzip

2019-05-15 Thread Robert Bradshaw
On Wed, May 15, 2019 at 8:43 PM Allie Chen  wrote:

> Thanks all for your reply. I will try each of them and see how it goes.
>
> The experiment I am working now is similar to
> https://stackoverflow.com/questions/48886943/early-results-from-groupbykey-transform,
> which tries to get early results from GroupByKey with windowing. I have
> some code like:
>
> Reading | beam.WindowInto(beam.window.GlobalWindows(),
>
>   
> trigger=trigger.Repeatedly(trigger.AfterCount(1)),
>  accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
>
> | MapWithAKey
>
> | GroupByKey
>
> | RemoveKey
>
> | OtherTransforms
>
>
> I don't see the window and trigger working, GroupByKey still waits for all
> elements. I also tried adding a timestamp for each element and using a
> fixed size window. Seems no impact.
>
>
> Anyone knows how to get the early results from GroupByKey for a bounded
> source?
>

Note that this is essentially how Reshuffle() is implemented. However,
batch never gives early results from a GroupByKey; each stage is executed
sequentially.

Is the goal here to be able to parallelize the Read with other operations?
If the Read (and limited-parallelism write) is still the bottleneck, that
might not help much.


Re: PubSubIO watermark not advancing for low volumes

2019-05-15 Thread Tim Sell
Thanks!

I made a jira
https://issues.apache.org/jira/browse/BEAM-7322

And dumped my sample code here:
https://github.com/tims/beam/tree/master/pubsub-watermark

*From: *Alexey Romanenko 
*Date: *Wed, May 15, 2019 at 12:18 AM
*To: * 

Not sure that this can be very helpful but I recall a similar issue with
> KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed.
>
> [1] https://issues.apache.org/jira/browse/BEAM-5063
> [2] https://github.com/apache/beam/pull/6178
>
> On 13 May 2019, at 20:52, Kenneth Knowles  wrote:
>
> You should definitely not feel foolish. That was a great report. I expect
> many users face the same situation. If they are lurking on this list, then
> you will have helped them already.
>
> Reza - I expect you should weigh in on the Jira, too, since the "one
> message test" use case seems like it wouldn't work at all with those
> MovingFunction params. But I may not understand all the subtleties of the
> connector.
>
> Kenn
>
> *From: *Tim Sell 
> *Date: *Mon, May 13, 2019 at 8:06 AM
> *To: * 
>
> Thanks for the feedback, I did some more investigating after you said 1
>> second frequency should be enough to sample on.. And it is I feel foolish.
>> I think I just wasn't waiting long enough as it takes minutes to close
>> the windows. We waited much longer when we were just messages manually and
>> never had a window close.
>>
>> I'm generating some stats of lag times to window closing for different
>> frequencies, with code so people can reproduce it, then I'll add this to a
>> jira ticket.
>>
>> *From: *Kenneth Knowles 
>> *Date: *Mon, May 13, 2019 at 10:48 AM
>> *To: * , dev
>>
>> Nice analysis & details!
>>>
>>> Thanks to your info, I think it is the configuration of MovingFunction
>>> [1] that is the likely culprit, but I don't totally understand why. It is
>>> configured like so:
>>>
>>>  - store 60 seconds of data
>>>  - update data every 5 seconds
>>>  - require at least 10 messages to be 'significant'
>>>  - require messages from at least 2 distinct 5 second update periods to
>>> 'significant'
>>>
>>> I would expect a rate of 1 message per second to satisfy this. I may
>>> have read something wrong.
>>>
>>> Have you filed an issue in Jira [2]?
>>>
>>> Kenn
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
>>> [2] https://issues.apache.org/jira/projects/BEAM/issues
>>>
>>> *From: *Tim Sell 
>>> *Date: *Fri, May 10, 2019 at 4:09 AM
>>> *To: * 
>>>
>>> Hello,

 I have identified an issue where the watermark does not advance when
 using the beam PubSubIO when volumes are very low.

 The behaviour is easily replicated if you apply a fixed window
 triggering after the watermark passes the end of the window.

 pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
 .apply(ParDo.of(new ParseScoreEventFn()))
 
 .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60)))
 .triggering(AfterWatermark.pastEndOfWindow())
 .withAllowedLateness(Duration.standardSeconds(60))
 .discardingFiredPanes())
 .apply(MapElements.into(kvs(strings(), integers()))
 .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), 
 scoreEvent.getScore(
 .apply(Count.perKey())
 .apply(ParDo.of(Log.of("counted per key")));

 With this triggering, using both the flink local runner the direct
 runner, *no panes will ever be emitted* if the volume of messages in
 pubsub is very low. eg 1 per second.

 If I change the triggering to have early firings I get exactly the
 emitted panes that you would expect.

 .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60)))
 .triggering(AfterWatermark.pastEndOfWindow()
 .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
 .alignedTo(Duration.standardSeconds(60
 .withAllowedLateness(Duration.standardSeconds(60))
 .discardingFiredPanes())

 I can use any variation of early firing triggers and they work as
 expected.

 We believe that the watermark is not advancing at all when the volume
 is too low because of the sampling that PubSubIO does to determine it's
 watermark. It just never has a large enough sample.
 This problem occurs in the direct runner and flink runner, but not in
 the dataflow runner (because dataflow uses it's own PubSubIO because
 dataflow has access to internal details of pubsub and so doesn't need to do
 any sampling).


 I have also verified that for high volumes of messages, the PubSubIO
 *does* successfully advance the watermark. Here's a python script I wrote
 to mass produce random messages:

 import json
 import random
 from google.cloud import pubsub_v1


 def publish_loop(n, proj

Re: Problem with gzip

2019-05-15 Thread Robert Bradshaw
Interesting thread. Thanks for digging that up.

I would try the shuffle_mode=service experiment (forgot that wasn't
yet the default). If that doesn't do the trick, though avro as a
materialization format does not provide perfect parallelism, it should
be significantly better than what you have now (large gzip files) and
may be good enough.

On Wed, May 15, 2019 at 2:34 PM Michael Luckey  wrote:
>
> @Robert
>
> Does your suggestion imply, that the points made by Eugene on BEAM-2803 do 
> not apply (anymore) and the combined reshuffle could just be omitted?
>
> On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw  wrote:
>>
>> Unfortunately the "write" portion of the reshuffle cannot be
>> parallelized more than the source that it's reading from. In my
>> experience, generally the read is the bottleneck in this case, but
>> it's possible (e.g. if the input compresses extremely well) that it is
>> the write that is slow (which you seem to indicate based on your
>> observation of the UI, right?).
>>
>> It could be that materializing to temporary files is cheaper than
>> materializing randomly to shuffle (especially on pre-portable Python).
>> In that case you could force a fusion break with a side input instead.
>> E.g.
>>
>> class FusionBreak(beam.PTransform):
>> def expand(self, pcoll):
>> # Create an empty PCollection that depends on pcoll.
>> empty = pcoll | beam.FlatMap(lambda x: ())
>> # Use this empty PCollection as a side input, which will force
>> a fusion break.
>> return pcoll | beam.Map(lambda x, unused: x,
>> beam.pvalue.AsIterable(empty))
>>
>> which could be used in place of Reshard like
>>
>> p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>>
>> You'll probably want to be sure to pass the use_fastavro experiment as well.
>>
>> On Wed, May 15, 2019 at 6:53 AM Niels Basjes  wrote:
>> >
>> > Hi
>> >
>> > This project is a completely different solution towards this problem, but 
>> > in the hadoop mapreduce context.
>> >
>> > https://github.com/nielsbasjes/splittablegzip
>> >
>> >
>> > I have used this a lot in the past.
>> > Perhaps porting this project to beam is an option?
>> >
>> > Niels Basjes
>> >
>> >
>> >
>> > On Tue, May 14, 2019, 20:45 Lukasz Cwik  wrote:
>> >>
>> >> Sorry I couldn't be more helpful.
>> >>
>> >> From: Allie Chen 
>> >> Date: Tue, May 14, 2019 at 10:09 AM
>> >> To: 
>> >> Cc: user
>> >>
>> >>> Thank Lukasz. Unfortunately, decompressing the files is not an option 
>> >>> for us.
>> >>>
>> >>>
>> >>> I am trying to speed up Reshuffle step, since it waits for all data. 
>> >>> Here are two ways I have tried:
>> >>>
>> >>> 1.  add timestamps to the PCollection's elements after reading (since it 
>> >>> is bounded source), then apply windowing before Reshuffle, but it still 
>> >>> waits all data.
>> >>>
>> >>>
>> >>> 2.  run the pipeline with --streaming flag, but it leads to an error: 
>> >>> Workflow failed. Causes: Expected custom source to have non-zero number 
>> >>> of splits. Also, I found in 
>> >>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features:
>> >>>
>> >>> DataflowRunner does not currently support the following Cloud Dataflow 
>> >>> specific features with Python streaming execution.
>> >>>
>> >>> Streaming autoscaling
>> >>>
>> >>> I doubt whether this approach can solve my issue.
>> >>>
>> >>>
>> >>> Thanks so much!
>> >>>
>> >>> Allie
>> >>>
>> >>>
>> >>> From: Lukasz Cwik 
>> >>> Date: Tue, May 14, 2019 at 11:16 AM
>> >>> To: dev
>> >>> Cc: user
>> >>>
>>  Do you need to perform any joins across the files (e.g. 
>>  Combine.perKey/GroupByKey/...)?
>>  If not, you could structure your pipeline
>>  ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>>  ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>>  ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>>  and then run it as a batch pipeline.
>> 
>>  You can set --streaming=true on the pipeline and then it will run in a 
>>  streaming mode but streaming prioritizes low latency and correctness on 
>>  Google Cloud Dataflow so it will cost more to run your pipeline then in 
>>  batch mode. It may make more sense to store the data uncompressed as it 
>>  may be less expensive then paying the additional compute cost for 
>>  streaming.
>> 
>>  From: Allie Chen 
>>  Date: Tue, May 14, 2019 at 7:38 AM
>>  To: 
>>  Cc: user
>> 
>> > Is it possible to use windowing or somehow pretend it is streaming so 
>> > Reshuffle or GroupByKey won't wait until all data has been read?
>> >
>> > Thanks!
>> > Allie
>> >
>> > From: Lukasz Cwik 
>> > Date: Fri, May 10, 2019 at 5:36 PM
>> > To: dev
>> > Cc: user
>> >
>> >> There is no such flag to turn of fusion.
>> >>
>> >> Writing 100s of GiBs of uncompressed data to reshuffle will take time 
>> >> when it is limited to a smal

Re: Problem with gzip

2019-05-15 Thread Michael Luckey
@Robert

Does your suggestion imply, that the points made by Eugene on BEAM-2803 do
not apply (anymore) and the combined reshuffle could just be omitted?

On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw  wrote:

> Unfortunately the "write" portion of the reshuffle cannot be
> parallelized more than the source that it's reading from. In my
> experience, generally the read is the bottleneck in this case, but
> it's possible (e.g. if the input compresses extremely well) that it is
> the write that is slow (which you seem to indicate based on your
> observation of the UI, right?).
>
> It could be that materializing to temporary files is cheaper than
> materializing randomly to shuffle (especially on pre-portable Python).
> In that case you could force a fusion break with a side input instead.
> E.g.
>
> class FusionBreak(beam.PTransform):
> def expand(self, pcoll):
> # Create an empty PCollection that depends on pcoll.
> empty = pcoll | beam.FlatMap(lambda x: ())
> # Use this empty PCollection as a side input, which will force
> a fusion break.
> return pcoll | beam.Map(lambda x, unused: x,
> beam.pvalue.AsIterable(empty))
>
> which could be used in place of Reshard like
>
> p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>
> You'll probably want to be sure to pass the use_fastavro experiment as
> well.
>
> On Wed, May 15, 2019 at 6:53 AM Niels Basjes  wrote:
> >
> > Hi
> >
> > This project is a completely different solution towards this problem,
> but in the hadoop mapreduce context.
> >
> > https://github.com/nielsbasjes/splittablegzip
> >
> >
> > I have used this a lot in the past.
> > Perhaps porting this project to beam is an option?
> >
> > Niels Basjes
> >
> >
> >
> > On Tue, May 14, 2019, 20:45 Lukasz Cwik  wrote:
> >>
> >> Sorry I couldn't be more helpful.
> >>
> >> From: Allie Chen 
> >> Date: Tue, May 14, 2019 at 10:09 AM
> >> To: 
> >> Cc: user
> >>
> >>> Thank Lukasz. Unfortunately, decompressing the files is not an option
> for us.
> >>>
> >>>
> >>> I am trying to speed up Reshuffle step, since it waits for all data.
> Here are two ways I have tried:
> >>>
> >>> 1.  add timestamps to the PCollection's elements after reading (since
> it is bounded source), then apply windowing before Reshuffle, but it still
> waits all data.
> >>>
> >>>
> >>> 2.  run the pipeline with --streaming flag, but it leads to an error:
> Workflow failed. Causes: Expected custom source to have non-zero number of
> splits. Also, I found in
> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
> :
> >>>
> >>> DataflowRunner does not currently support the following Cloud Dataflow
> specific features with Python streaming execution.
> >>>
> >>> Streaming autoscaling
> >>>
> >>> I doubt whether this approach can solve my issue.
> >>>
> >>>
> >>> Thanks so much!
> >>>
> >>> Allie
> >>>
> >>>
> >>> From: Lukasz Cwik 
> >>> Date: Tue, May 14, 2019 at 11:16 AM
> >>> To: dev
> >>> Cc: user
> >>>
>  Do you need to perform any joins across the files (e.g.
> Combine.perKey/GroupByKey/...)?
>  If not, you could structure your pipeline
>  ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>  ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>  ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>  and then run it as a batch pipeline.
> 
>  You can set --streaming=true on the pipeline and then it will run in
> a streaming mode but streaming prioritizes low latency and correctness on
> Google Cloud Dataflow so it will cost more to run your pipeline then in
> batch mode. It may make more sense to store the data uncompressed as it may
> be less expensive then paying the additional compute cost for streaming.
> 
>  From: Allie Chen 
>  Date: Tue, May 14, 2019 at 7:38 AM
>  To: 
>  Cc: user
> 
> > Is it possible to use windowing or somehow pretend it is streaming
> so Reshuffle or GroupByKey won't wait until all data has been read?
> >
> > Thanks!
> > Allie
> >
> > From: Lukasz Cwik 
> > Date: Fri, May 10, 2019 at 5:36 PM
> > To: dev
> > Cc: user
> >
> >> There is no such flag to turn of fusion.
> >>
> >> Writing 100s of GiBs of uncompressed data to reshuffle will take
> time when it is limited to a small number of workers.
> >>
> >> If you can split up your input into a lot of smaller files that are
> compressed then you shouldn't need to use the reshuffle but still could if
> you found it helped.
> >>
> >> On Fri, May 10, 2019 at 2:24 PM Allie Chen 
> wrote:
> >>>
> >>> Re Lukasz: Thanks! I am not able to control the compression format
> but I will see whether the splitting gzip files will work. Is there a
> simple flag in Dataflow that could turn off the fusion?
> >>>
> >>> Re Reuven: No, I checked the run time on Dataflow UI, the
> GroupByKey and FlatMap in Reshuffle are very slow whe

Re: Problem with gzip

2019-05-15 Thread Robert Bradshaw
Unfortunately the "write" portion of the reshuffle cannot be
parallelized more than the source that it's reading from. In my
experience, generally the read is the bottleneck in this case, but
it's possible (e.g. if the input compresses extremely well) that it is
the write that is slow (which you seem to indicate based on your
observation of the UI, right?).

It could be that materializing to temporary files is cheaper than
materializing randomly to shuffle (especially on pre-portable Python).
In that case you could force a fusion break with a side input instead.
E.g.

class FusionBreak(beam.PTransform):
def expand(self, pcoll):
# Create an empty PCollection that depends on pcoll.
empty = pcoll | beam.FlatMap(lambda x: ())
# Use this empty PCollection as a side input, which will force
a fusion break.
return pcoll | beam.Map(lambda x, unused: x,
beam.pvalue.AsIterable(empty))

which could be used in place of Reshard like

p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...

You'll probably want to be sure to pass the use_fastavro experiment as well.

On Wed, May 15, 2019 at 6:53 AM Niels Basjes  wrote:
>
> Hi
>
> This project is a completely different solution towards this problem, but in 
> the hadoop mapreduce context.
>
> https://github.com/nielsbasjes/splittablegzip
>
>
> I have used this a lot in the past.
> Perhaps porting this project to beam is an option?
>
> Niels Basjes
>
>
>
> On Tue, May 14, 2019, 20:45 Lukasz Cwik  wrote:
>>
>> Sorry I couldn't be more helpful.
>>
>> From: Allie Chen 
>> Date: Tue, May 14, 2019 at 10:09 AM
>> To: 
>> Cc: user
>>
>>> Thank Lukasz. Unfortunately, decompressing the files is not an option for 
>>> us.
>>>
>>>
>>> I am trying to speed up Reshuffle step, since it waits for all data. Here 
>>> are two ways I have tried:
>>>
>>> 1.  add timestamps to the PCollection's elements after reading (since it is 
>>> bounded source), then apply windowing before Reshuffle, but it still waits 
>>> all data.
>>>
>>>
>>> 2.  run the pipeline with --streaming flag, but it leads to an error: 
>>> Workflow failed. Causes: Expected custom source to have non-zero number of 
>>> splits. Also, I found in 
>>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features:
>>>
>>> DataflowRunner does not currently support the following Cloud Dataflow 
>>> specific features with Python streaming execution.
>>>
>>> Streaming autoscaling
>>>
>>> I doubt whether this approach can solve my issue.
>>>
>>>
>>> Thanks so much!
>>>
>>> Allie
>>>
>>>
>>> From: Lukasz Cwik 
>>> Date: Tue, May 14, 2019 at 11:16 AM
>>> To: dev
>>> Cc: user
>>>
 Do you need to perform any joins across the files (e.g. 
 Combine.perKey/GroupByKey/...)?
 If not, you could structure your pipeline
 ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
 ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
 ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
 and then run it as a batch pipeline.

 You can set --streaming=true on the pipeline and then it will run in a 
 streaming mode but streaming prioritizes low latency and correctness on 
 Google Cloud Dataflow so it will cost more to run your pipeline then in 
 batch mode. It may make more sense to store the data uncompressed as it 
 may be less expensive then paying the additional compute cost for 
 streaming.

 From: Allie Chen 
 Date: Tue, May 14, 2019 at 7:38 AM
 To: 
 Cc: user

> Is it possible to use windowing or somehow pretend it is streaming so 
> Reshuffle or GroupByKey won't wait until all data has been read?
>
> Thanks!
> Allie
>
> From: Lukasz Cwik 
> Date: Fri, May 10, 2019 at 5:36 PM
> To: dev
> Cc: user
>
>> There is no such flag to turn of fusion.
>>
>> Writing 100s of GiBs of uncompressed data to reshuffle will take time 
>> when it is limited to a small number of workers.
>>
>> If you can split up your input into a lot of smaller files that are 
>> compressed then you shouldn't need to use the reshuffle but still could 
>> if you found it helped.
>>
>> On Fri, May 10, 2019 at 2:24 PM Allie Chen  wrote:
>>>
>>> Re Lukasz: Thanks! I am not able to control the compression format but 
>>> I will see whether the splitting gzip files will work. Is there a 
>>> simple flag in Dataflow that could turn off the fusion?
>>>
>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey 
>>> and FlatMap in Reshuffle are very slow when the data is large. 
>>> Reshuffle itself is not parallel either.
>>>
>>> Thanks all,
>>>
>>> Allie
>>>
>>> From: Reuven Lax 
>>> Date: Fri, May 10, 2019 at 5:02 PM
>>> To: dev
>>> Cc: user
>>>
 It's unlikely that Reshuffle itself takes hours. It's more likely that 
 simply reading an

Re: Is there a way to decide what RDDs get cached in the Spark Runner?

2019-05-15 Thread Jan Lukavský

The DAG formatted strangely, the intended formatting was

PCollectionA -> PCollectionB -> PCollectionC

    \-> PCollectionD

Jan

On 5/15/19 11:19 AM, Jan Lukavský wrote:

Hi,

I think this thread is another manifestation of a problem discussed 
recently in [1]. Long story short - users in certain situations might 
legitimately need finer control over how is their pipeline translated 
into runner's operators. The case of caching is another one, where 
looking at the pipeline itself doesn't contain enough information to 
perform correct optimization - consider for example this DAG of 
operations:


 PCollectionA -> PCollectionB -> PCollectionC

  \-> PCollectionD

That is PCollectionB is consumed by two operators - PCollectionC and 
PCollectionD. Logical conclusion would be that we need to cache 
PCollectionB, but what if the transform that produced PCollectionB is 
of type "cheap explosion" - where PCollectionB is significantly bigger 
than PCollectionA and at the same time can be produced very cheaply 
from elements of PCollectionA? Than it would make sense to cache 
PCollectionA instead. But you cannot know that just from the DAG. 
There would be many more examples of this. Maybe we could think about 
how to support this, which might help widen the user base.


Jan

[1] https://www.mail-archive.com/user@beam.apache.org/msg03809.html

On 5/15/19 10:39 AM, Robert Bradshaw wrote:

Just to clarify, do you need direct control over what to cache, or
would it be OK to let Spark decide the minimal set of RDDs to cache as
long as we didn't cache all intermediates?

From: Augusto Ribeiro 
Date: Wed, May 15, 2019 at 8:37 AM
To: 


Hi Kyle,

Thanks for the help. It seems like I have no other choice than using 
Spark directly, since my job causes immense memory pressure if I 
can't decide what to cache.


Best regards,
Augusto

On 14 May 2019, at 18:40, Kyle Weaver  wrote:

Minor correction: Slack channel is actually #beam-spark

Kyle Weaver | Software Engineer | github.com/ibzib | 
kcwea...@google.com | +1650203



From: Kyle Weaver 
Date: Tue, May 14, 2019 at 9:38 AM
To: 


Hi Augusto,

Right now the default behavior is to cache all intermediate RDDs 
that are consumed more than once by the pipeline. This can be 
disabled with `options.setCacheDisabled(true)` [1], but there is 
currently no way for the user to specify to the runner that it 
should cache certain RDDs, but not others.


There has recently been some discussion on the Slack (#spark-beam) 
about implementing such a feature, but no concrete plans as of yet.


[1] 
https://github.com/apache/beam/blob/81faf35c8a42493317eba9fa1e7b06fb42d54662/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java#L150


Thanks

Kyle Weaver | Software Engineer | github.com/ibzib | 
kcwea...@google.com | +1650203



From: augusto@gmail.com 
Date: Tue, May 14, 2019 at 5:01 AM
To: 


Hi,

I guess the title says it all, right now it seems like BEAM caches 
all the intermediate RDD results for my pipeline when using the 
Spark runner, this leads to a very inefficient usage of memory. 
Any way to control this?


Best regards,
Augusto




Re: Is there a way to decide what RDDs get cached in the Spark Runner?

2019-05-15 Thread Jan Lukavský

Hi,

I think this thread is another manifestation of a problem discussed 
recently in [1]. Long story short - users in certain situations might 
legitimately need finer control over how is their pipeline translated 
into runner's operators. The case of caching is another one, where 
looking at the pipeline itself doesn't contain enough information to 
perform correct optimization - consider for example this DAG of operations:


 PCollectionA -> PCollectionB -> PCollectionC

  \-> PCollectionD

That is PCollectionB is consumed by two operators - PCollectionC and 
PCollectionD. Logical conclusion would be that we need to cache 
PCollectionB, but what if the transform that produced PCollectionB is of 
type "cheap explosion" - where PCollectionB is significantly bigger than 
PCollectionA and at the same time can be produced very cheaply from 
elements of PCollectionA? Than it would make sense to cache PCollectionA 
instead. But you cannot know that just from the DAG. There would be many 
more examples of this. Maybe we could think about how to support this, 
which might help widen the user base.


Jan

[1] https://www.mail-archive.com/user@beam.apache.org/msg03809.html

On 5/15/19 10:39 AM, Robert Bradshaw wrote:

Just to clarify, do you need direct control over what to cache, or
would it be OK to let Spark decide the minimal set of RDDs to cache as
long as we didn't cache all intermediates?

From: Augusto Ribeiro 
Date: Wed, May 15, 2019 at 8:37 AM
To: 


Hi Kyle,

Thanks for the help. It seems like I have no other choice than using Spark 
directly, since my job causes immense memory pressure if I can't decide what to 
cache.

Best regards,
Augusto

On 14 May 2019, at 18:40, Kyle Weaver  wrote:

Minor correction: Slack channel is actually #beam-spark

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com | 
+1650203


From: Kyle Weaver 
Date: Tue, May 14, 2019 at 9:38 AM
To: 


Hi Augusto,

Right now the default behavior is to cache all intermediate RDDs that are 
consumed more than once by the pipeline. This can be disabled with 
`options.setCacheDisabled(true)` [1], but there is currently no way for the 
user to specify to the runner that it should cache certain RDDs, but not others.

There has recently been some discussion on the Slack (#spark-beam) about 
implementing such a feature, but no concrete plans as of yet.

[1] 
https://github.com/apache/beam/blob/81faf35c8a42493317eba9fa1e7b06fb42d54662/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java#L150

Thanks

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com | 
+1650203


From: augusto@gmail.com 
Date: Tue, May 14, 2019 at 5:01 AM
To: 


Hi,

I guess the title says it all, right now it seems like BEAM caches all the 
intermediate RDD results for my pipeline when using the Spark runner, this 
leads to a very inefficient usage of memory. Any way to control this?

Best regards,
Augusto




Re: Is there a way to decide what RDDs get cached in the Spark Runner?

2019-05-15 Thread Robert Bradshaw
Just to clarify, do you need direct control over what to cache, or
would it be OK to let Spark decide the minimal set of RDDs to cache as
long as we didn't cache all intermediates?

From: Augusto Ribeiro 
Date: Wed, May 15, 2019 at 8:37 AM
To: 

> Hi Kyle,
>
> Thanks for the help. It seems like I have no other choice than using Spark 
> directly, since my job causes immense memory pressure if I can't decide what 
> to cache.
>
> Best regards,
> Augusto
>
> On 14 May 2019, at 18:40, Kyle Weaver  wrote:
>
> Minor correction: Slack channel is actually #beam-spark
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com | 
> +1650203
>
>
> From: Kyle Weaver 
> Date: Tue, May 14, 2019 at 9:38 AM
> To: 
>
>> Hi Augusto,
>>
>> Right now the default behavior is to cache all intermediate RDDs that are 
>> consumed more than once by the pipeline. This can be disabled with 
>> `options.setCacheDisabled(true)` [1], but there is currently no way for the 
>> user to specify to the runner that it should cache certain RDDs, but not 
>> others.
>>
>> There has recently been some discussion on the Slack (#spark-beam) about 
>> implementing such a feature, but no concrete plans as of yet.
>>
>> [1] 
>> https://github.com/apache/beam/blob/81faf35c8a42493317eba9fa1e7b06fb42d54662/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java#L150
>>
>> Thanks
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com | 
>> +1650203
>>
>>
>> From: augusto@gmail.com 
>> Date: Tue, May 14, 2019 at 5:01 AM
>> To: 
>>
>>> Hi,
>>>
>>> I guess the title says it all, right now it seems like BEAM caches all the 
>>> intermediate RDD results for my pipeline when using the Spark runner, this 
>>> leads to a very inefficient usage of memory. Any way to control this?
>>>
>>> Best regards,
>>> Augusto
>
>