Re: Problem with gzip
On Wed, May 15, 2019 at 8:43 PM Allie Chen wrote: > Thanks all for your reply. I will try each of them and see how it goes. > > The experiment I am working now is similar to > https://stackoverflow.com/questions/48886943/early-results-from-groupbykey-transform, > which tries to get early results from GroupByKey with windowing. I have > some code like: > > Reading | beam.WindowInto(beam.window.GlobalWindows(), > > > trigger=trigger.Repeatedly(trigger.AfterCount(1)), > accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING) > > | MapWithAKey > > | GroupByKey > > | RemoveKey > > | OtherTransforms > > > I don't see the window and trigger working, GroupByKey still waits for all > elements. I also tried adding a timestamp for each element and using a > fixed size window. Seems no impact. > > > Anyone knows how to get the early results from GroupByKey for a bounded > source? > Note that this is essentially how Reshuffle() is implemented. However, batch never gives early results from a GroupByKey; each stage is executed sequentially. Is the goal here to be able to parallelize the Read with other operations? If the Read (and limited-parallelism write) is still the bottleneck, that might not help much.
Re: PubSubIO watermark not advancing for low volumes
Thanks! I made a jira https://issues.apache.org/jira/browse/BEAM-7322 And dumped my sample code here: https://github.com/tims/beam/tree/master/pubsub-watermark *From: *Alexey Romanenko *Date: *Wed, May 15, 2019 at 12:18 AM *To: * Not sure that this can be very helpful but I recall a similar issue with > KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed. > > [1] https://issues.apache.org/jira/browse/BEAM-5063 > [2] https://github.com/apache/beam/pull/6178 > > On 13 May 2019, at 20:52, Kenneth Knowles wrote: > > You should definitely not feel foolish. That was a great report. I expect > many users face the same situation. If they are lurking on this list, then > you will have helped them already. > > Reza - I expect you should weigh in on the Jira, too, since the "one > message test" use case seems like it wouldn't work at all with those > MovingFunction params. But I may not understand all the subtleties of the > connector. > > Kenn > > *From: *Tim Sell > *Date: *Mon, May 13, 2019 at 8:06 AM > *To: * > > Thanks for the feedback, I did some more investigating after you said 1 >> second frequency should be enough to sample on.. And it is I feel foolish. >> I think I just wasn't waiting long enough as it takes minutes to close >> the windows. We waited much longer when we were just messages manually and >> never had a window close. >> >> I'm generating some stats of lag times to window closing for different >> frequencies, with code so people can reproduce it, then I'll add this to a >> jira ticket. >> >> *From: *Kenneth Knowles >> *Date: *Mon, May 13, 2019 at 10:48 AM >> *To: * , dev >> >> Nice analysis & details! >>> >>> Thanks to your info, I think it is the configuration of MovingFunction >>> [1] that is the likely culprit, but I don't totally understand why. It is >>> configured like so: >>> >>> - store 60 seconds of data >>> - update data every 5 seconds >>> - require at least 10 messages to be 'significant' >>> - require messages from at least 2 distinct 5 second update periods to >>> 'significant' >>> >>> I would expect a rate of 1 message per second to satisfy this. I may >>> have read something wrong. >>> >>> Have you filed an issue in Jira [2]? >>> >>> Kenn >>> >>> [1] >>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508 >>> [2] https://issues.apache.org/jira/projects/BEAM/issues >>> >>> *From: *Tim Sell >>> *Date: *Fri, May 10, 2019 at 4:09 AM >>> *To: * >>> >>> Hello, I have identified an issue where the watermark does not advance when using the beam PubSubIO when volumes are very low. The behaviour is easily replicated if you apply a fixed window triggering after the watermark passes the end of the window. pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription)) .apply(ParDo.of(new ParseScoreEventFn())) .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60))) .triggering(AfterWatermark.pastEndOfWindow()) .withAllowedLateness(Duration.standardSeconds(60)) .discardingFiredPanes()) .apply(MapElements.into(kvs(strings(), integers())) .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), scoreEvent.getScore( .apply(Count.perKey()) .apply(ParDo.of(Log.of("counted per key"))); With this triggering, using both the flink local runner the direct runner, *no panes will ever be emitted* if the volume of messages in pubsub is very low. eg 1 per second. If I change the triggering to have early firings I get exactly the emitted panes that you would expect. .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60))) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .alignedTo(Duration.standardSeconds(60 .withAllowedLateness(Duration.standardSeconds(60)) .discardingFiredPanes()) I can use any variation of early firing triggers and they work as expected. We believe that the watermark is not advancing at all when the volume is too low because of the sampling that PubSubIO does to determine it's watermark. It just never has a large enough sample. This problem occurs in the direct runner and flink runner, but not in the dataflow runner (because dataflow uses it's own PubSubIO because dataflow has access to internal details of pubsub and so doesn't need to do any sampling). I have also verified that for high volumes of messages, the PubSubIO *does* successfully advance the watermark. Here's a python script I wrote to mass produce random messages: import json import random from google.cloud import pubsub_v1 def publish_loop(n, proj
Re: Problem with gzip
Interesting thread. Thanks for digging that up. I would try the shuffle_mode=service experiment (forgot that wasn't yet the default). If that doesn't do the trick, though avro as a materialization format does not provide perfect parallelism, it should be significantly better than what you have now (large gzip files) and may be good enough. On Wed, May 15, 2019 at 2:34 PM Michael Luckey wrote: > > @Robert > > Does your suggestion imply, that the points made by Eugene on BEAM-2803 do > not apply (anymore) and the combined reshuffle could just be omitted? > > On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw wrote: >> >> Unfortunately the "write" portion of the reshuffle cannot be >> parallelized more than the source that it's reading from. In my >> experience, generally the read is the bottleneck in this case, but >> it's possible (e.g. if the input compresses extremely well) that it is >> the write that is slow (which you seem to indicate based on your >> observation of the UI, right?). >> >> It could be that materializing to temporary files is cheaper than >> materializing randomly to shuffle (especially on pre-portable Python). >> In that case you could force a fusion break with a side input instead. >> E.g. >> >> class FusionBreak(beam.PTransform): >> def expand(self, pcoll): >> # Create an empty PCollection that depends on pcoll. >> empty = pcoll | beam.FlatMap(lambda x: ()) >> # Use this empty PCollection as a side input, which will force >> a fusion break. >> return pcoll | beam.Map(lambda x, unused: x, >> beam.pvalue.AsIterable(empty)) >> >> which could be used in place of Reshard like >> >> p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ... >> >> You'll probably want to be sure to pass the use_fastavro experiment as well. >> >> On Wed, May 15, 2019 at 6:53 AM Niels Basjes wrote: >> > >> > Hi >> > >> > This project is a completely different solution towards this problem, but >> > in the hadoop mapreduce context. >> > >> > https://github.com/nielsbasjes/splittablegzip >> > >> > >> > I have used this a lot in the past. >> > Perhaps porting this project to beam is an option? >> > >> > Niels Basjes >> > >> > >> > >> > On Tue, May 14, 2019, 20:45 Lukasz Cwik wrote: >> >> >> >> Sorry I couldn't be more helpful. >> >> >> >> From: Allie Chen >> >> Date: Tue, May 14, 2019 at 10:09 AM >> >> To: >> >> Cc: user >> >> >> >>> Thank Lukasz. Unfortunately, decompressing the files is not an option >> >>> for us. >> >>> >> >>> >> >>> I am trying to speed up Reshuffle step, since it waits for all data. >> >>> Here are two ways I have tried: >> >>> >> >>> 1. add timestamps to the PCollection's elements after reading (since it >> >>> is bounded source), then apply windowing before Reshuffle, but it still >> >>> waits all data. >> >>> >> >>> >> >>> 2. run the pipeline with --streaming flag, but it leads to an error: >> >>> Workflow failed. Causes: Expected custom source to have non-zero number >> >>> of splits. Also, I found in >> >>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features: >> >>> >> >>> DataflowRunner does not currently support the following Cloud Dataflow >> >>> specific features with Python streaming execution. >> >>> >> >>> Streaming autoscaling >> >>> >> >>> I doubt whether this approach can solve my issue. >> >>> >> >>> >> >>> Thanks so much! >> >>> >> >>> Allie >> >>> >> >>> >> >>> From: Lukasz Cwik >> >>> Date: Tue, May 14, 2019 at 11:16 AM >> >>> To: dev >> >>> Cc: user >> >>> >> Do you need to perform any joins across the files (e.g. >> Combine.perKey/GroupByKey/...)? >> If not, you could structure your pipeline >> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA >> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB >> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC >> and then run it as a batch pipeline. >> >> You can set --streaming=true on the pipeline and then it will run in a >> streaming mode but streaming prioritizes low latency and correctness on >> Google Cloud Dataflow so it will cost more to run your pipeline then in >> batch mode. It may make more sense to store the data uncompressed as it >> may be less expensive then paying the additional compute cost for >> streaming. >> >> From: Allie Chen >> Date: Tue, May 14, 2019 at 7:38 AM >> To: >> Cc: user >> >> > Is it possible to use windowing or somehow pretend it is streaming so >> > Reshuffle or GroupByKey won't wait until all data has been read? >> > >> > Thanks! >> > Allie >> > >> > From: Lukasz Cwik >> > Date: Fri, May 10, 2019 at 5:36 PM >> > To: dev >> > Cc: user >> > >> >> There is no such flag to turn of fusion. >> >> >> >> Writing 100s of GiBs of uncompressed data to reshuffle will take time >> >> when it is limited to a smal
Re: Problem with gzip
@Robert Does your suggestion imply, that the points made by Eugene on BEAM-2803 do not apply (anymore) and the combined reshuffle could just be omitted? On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw wrote: > Unfortunately the "write" portion of the reshuffle cannot be > parallelized more than the source that it's reading from. In my > experience, generally the read is the bottleneck in this case, but > it's possible (e.g. if the input compresses extremely well) that it is > the write that is slow (which you seem to indicate based on your > observation of the UI, right?). > > It could be that materializing to temporary files is cheaper than > materializing randomly to shuffle (especially on pre-portable Python). > In that case you could force a fusion break with a side input instead. > E.g. > > class FusionBreak(beam.PTransform): > def expand(self, pcoll): > # Create an empty PCollection that depends on pcoll. > empty = pcoll | beam.FlatMap(lambda x: ()) > # Use this empty PCollection as a side input, which will force > a fusion break. > return pcoll | beam.Map(lambda x, unused: x, > beam.pvalue.AsIterable(empty)) > > which could be used in place of Reshard like > > p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ... > > You'll probably want to be sure to pass the use_fastavro experiment as > well. > > On Wed, May 15, 2019 at 6:53 AM Niels Basjes wrote: > > > > Hi > > > > This project is a completely different solution towards this problem, > but in the hadoop mapreduce context. > > > > https://github.com/nielsbasjes/splittablegzip > > > > > > I have used this a lot in the past. > > Perhaps porting this project to beam is an option? > > > > Niels Basjes > > > > > > > > On Tue, May 14, 2019, 20:45 Lukasz Cwik wrote: > >> > >> Sorry I couldn't be more helpful. > >> > >> From: Allie Chen > >> Date: Tue, May 14, 2019 at 10:09 AM > >> To: > >> Cc: user > >> > >>> Thank Lukasz. Unfortunately, decompressing the files is not an option > for us. > >>> > >>> > >>> I am trying to speed up Reshuffle step, since it waits for all data. > Here are two ways I have tried: > >>> > >>> 1. add timestamps to the PCollection's elements after reading (since > it is bounded source), then apply windowing before Reshuffle, but it still > waits all data. > >>> > >>> > >>> 2. run the pipeline with --streaming flag, but it leads to an error: > Workflow failed. Causes: Expected custom source to have non-zero number of > splits. Also, I found in > https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features > : > >>> > >>> DataflowRunner does not currently support the following Cloud Dataflow > specific features with Python streaming execution. > >>> > >>> Streaming autoscaling > >>> > >>> I doubt whether this approach can solve my issue. > >>> > >>> > >>> Thanks so much! > >>> > >>> Allie > >>> > >>> > >>> From: Lukasz Cwik > >>> Date: Tue, May 14, 2019 at 11:16 AM > >>> To: dev > >>> Cc: user > >>> > Do you need to perform any joins across the files (e.g. > Combine.perKey/GroupByKey/...)? > If not, you could structure your pipeline > ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA > ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB > ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC > and then run it as a batch pipeline. > > You can set --streaming=true on the pipeline and then it will run in > a streaming mode but streaming prioritizes low latency and correctness on > Google Cloud Dataflow so it will cost more to run your pipeline then in > batch mode. It may make more sense to store the data uncompressed as it may > be less expensive then paying the additional compute cost for streaming. > > From: Allie Chen > Date: Tue, May 14, 2019 at 7:38 AM > To: > Cc: user > > > Is it possible to use windowing or somehow pretend it is streaming > so Reshuffle or GroupByKey won't wait until all data has been read? > > > > Thanks! > > Allie > > > > From: Lukasz Cwik > > Date: Fri, May 10, 2019 at 5:36 PM > > To: dev > > Cc: user > > > >> There is no such flag to turn of fusion. > >> > >> Writing 100s of GiBs of uncompressed data to reshuffle will take > time when it is limited to a small number of workers. > >> > >> If you can split up your input into a lot of smaller files that are > compressed then you shouldn't need to use the reshuffle but still could if > you found it helped. > >> > >> On Fri, May 10, 2019 at 2:24 PM Allie Chen > wrote: > >>> > >>> Re Lukasz: Thanks! I am not able to control the compression format > but I will see whether the splitting gzip files will work. Is there a > simple flag in Dataflow that could turn off the fusion? > >>> > >>> Re Reuven: No, I checked the run time on Dataflow UI, the > GroupByKey and FlatMap in Reshuffle are very slow whe
Re: Problem with gzip
Unfortunately the "write" portion of the reshuffle cannot be parallelized more than the source that it's reading from. In my experience, generally the read is the bottleneck in this case, but it's possible (e.g. if the input compresses extremely well) that it is the write that is slow (which you seem to indicate based on your observation of the UI, right?). It could be that materializing to temporary files is cheaper than materializing randomly to shuffle (especially on pre-portable Python). In that case you could force a fusion break with a side input instead. E.g. class FusionBreak(beam.PTransform): def expand(self, pcoll): # Create an empty PCollection that depends on pcoll. empty = pcoll | beam.FlatMap(lambda x: ()) # Use this empty PCollection as a side input, which will force a fusion break. return pcoll | beam.Map(lambda x, unused: x, beam.pvalue.AsIterable(empty)) which could be used in place of Reshard like p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ... You'll probably want to be sure to pass the use_fastavro experiment as well. On Wed, May 15, 2019 at 6:53 AM Niels Basjes wrote: > > Hi > > This project is a completely different solution towards this problem, but in > the hadoop mapreduce context. > > https://github.com/nielsbasjes/splittablegzip > > > I have used this a lot in the past. > Perhaps porting this project to beam is an option? > > Niels Basjes > > > > On Tue, May 14, 2019, 20:45 Lukasz Cwik wrote: >> >> Sorry I couldn't be more helpful. >> >> From: Allie Chen >> Date: Tue, May 14, 2019 at 10:09 AM >> To: >> Cc: user >> >>> Thank Lukasz. Unfortunately, decompressing the files is not an option for >>> us. >>> >>> >>> I am trying to speed up Reshuffle step, since it waits for all data. Here >>> are two ways I have tried: >>> >>> 1. add timestamps to the PCollection's elements after reading (since it is >>> bounded source), then apply windowing before Reshuffle, but it still waits >>> all data. >>> >>> >>> 2. run the pipeline with --streaming flag, but it leads to an error: >>> Workflow failed. Causes: Expected custom source to have non-zero number of >>> splits. Also, I found in >>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features: >>> >>> DataflowRunner does not currently support the following Cloud Dataflow >>> specific features with Python streaming execution. >>> >>> Streaming autoscaling >>> >>> I doubt whether this approach can solve my issue. >>> >>> >>> Thanks so much! >>> >>> Allie >>> >>> >>> From: Lukasz Cwik >>> Date: Tue, May 14, 2019 at 11:16 AM >>> To: dev >>> Cc: user >>> Do you need to perform any joins across the files (e.g. Combine.perKey/GroupByKey/...)? If not, you could structure your pipeline ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC and then run it as a batch pipeline. You can set --streaming=true on the pipeline and then it will run in a streaming mode but streaming prioritizes low latency and correctness on Google Cloud Dataflow so it will cost more to run your pipeline then in batch mode. It may make more sense to store the data uncompressed as it may be less expensive then paying the additional compute cost for streaming. From: Allie Chen Date: Tue, May 14, 2019 at 7:38 AM To: Cc: user > Is it possible to use windowing or somehow pretend it is streaming so > Reshuffle or GroupByKey won't wait until all data has been read? > > Thanks! > Allie > > From: Lukasz Cwik > Date: Fri, May 10, 2019 at 5:36 PM > To: dev > Cc: user > >> There is no such flag to turn of fusion. >> >> Writing 100s of GiBs of uncompressed data to reshuffle will take time >> when it is limited to a small number of workers. >> >> If you can split up your input into a lot of smaller files that are >> compressed then you shouldn't need to use the reshuffle but still could >> if you found it helped. >> >> On Fri, May 10, 2019 at 2:24 PM Allie Chen wrote: >>> >>> Re Lukasz: Thanks! I am not able to control the compression format but >>> I will see whether the splitting gzip files will work. Is there a >>> simple flag in Dataflow that could turn off the fusion? >>> >>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey >>> and FlatMap in Reshuffle are very slow when the data is large. >>> Reshuffle itself is not parallel either. >>> >>> Thanks all, >>> >>> Allie >>> >>> From: Reuven Lax >>> Date: Fri, May 10, 2019 at 5:02 PM >>> To: dev >>> Cc: user >>> It's unlikely that Reshuffle itself takes hours. It's more likely that simply reading an
Re: Is there a way to decide what RDDs get cached in the Spark Runner?
The DAG formatted strangely, the intended formatting was PCollectionA -> PCollectionB -> PCollectionC \-> PCollectionD Jan On 5/15/19 11:19 AM, Jan Lukavský wrote: Hi, I think this thread is another manifestation of a problem discussed recently in [1]. Long story short - users in certain situations might legitimately need finer control over how is their pipeline translated into runner's operators. The case of caching is another one, where looking at the pipeline itself doesn't contain enough information to perform correct optimization - consider for example this DAG of operations: PCollectionA -> PCollectionB -> PCollectionC \-> PCollectionD That is PCollectionB is consumed by two operators - PCollectionC and PCollectionD. Logical conclusion would be that we need to cache PCollectionB, but what if the transform that produced PCollectionB is of type "cheap explosion" - where PCollectionB is significantly bigger than PCollectionA and at the same time can be produced very cheaply from elements of PCollectionA? Than it would make sense to cache PCollectionA instead. But you cannot know that just from the DAG. There would be many more examples of this. Maybe we could think about how to support this, which might help widen the user base. Jan [1] https://www.mail-archive.com/user@beam.apache.org/msg03809.html On 5/15/19 10:39 AM, Robert Bradshaw wrote: Just to clarify, do you need direct control over what to cache, or would it be OK to let Spark decide the minimal set of RDDs to cache as long as we didn't cache all intermediates? From: Augusto Ribeiro Date: Wed, May 15, 2019 at 8:37 AM To: Hi Kyle, Thanks for the help. It seems like I have no other choice than using Spark directly, since my job causes immense memory pressure if I can't decide what to cache. Best regards, Augusto On 14 May 2019, at 18:40, Kyle Weaver wrote: Minor correction: Slack channel is actually #beam-spark Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com | +1650203 From: Kyle Weaver Date: Tue, May 14, 2019 at 9:38 AM To: Hi Augusto, Right now the default behavior is to cache all intermediate RDDs that are consumed more than once by the pipeline. This can be disabled with `options.setCacheDisabled(true)` [1], but there is currently no way for the user to specify to the runner that it should cache certain RDDs, but not others. There has recently been some discussion on the Slack (#spark-beam) about implementing such a feature, but no concrete plans as of yet. [1] https://github.com/apache/beam/blob/81faf35c8a42493317eba9fa1e7b06fb42d54662/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java#L150 Thanks Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com | +1650203 From: augusto@gmail.com Date: Tue, May 14, 2019 at 5:01 AM To: Hi, I guess the title says it all, right now it seems like BEAM caches all the intermediate RDD results for my pipeline when using the Spark runner, this leads to a very inefficient usage of memory. Any way to control this? Best regards, Augusto
Re: Is there a way to decide what RDDs get cached in the Spark Runner?
Hi, I think this thread is another manifestation of a problem discussed recently in [1]. Long story short - users in certain situations might legitimately need finer control over how is their pipeline translated into runner's operators. The case of caching is another one, where looking at the pipeline itself doesn't contain enough information to perform correct optimization - consider for example this DAG of operations: PCollectionA -> PCollectionB -> PCollectionC \-> PCollectionD That is PCollectionB is consumed by two operators - PCollectionC and PCollectionD. Logical conclusion would be that we need to cache PCollectionB, but what if the transform that produced PCollectionB is of type "cheap explosion" - where PCollectionB is significantly bigger than PCollectionA and at the same time can be produced very cheaply from elements of PCollectionA? Than it would make sense to cache PCollectionA instead. But you cannot know that just from the DAG. There would be many more examples of this. Maybe we could think about how to support this, which might help widen the user base. Jan [1] https://www.mail-archive.com/user@beam.apache.org/msg03809.html On 5/15/19 10:39 AM, Robert Bradshaw wrote: Just to clarify, do you need direct control over what to cache, or would it be OK to let Spark decide the minimal set of RDDs to cache as long as we didn't cache all intermediates? From: Augusto Ribeiro Date: Wed, May 15, 2019 at 8:37 AM To: Hi Kyle, Thanks for the help. It seems like I have no other choice than using Spark directly, since my job causes immense memory pressure if I can't decide what to cache. Best regards, Augusto On 14 May 2019, at 18:40, Kyle Weaver wrote: Minor correction: Slack channel is actually #beam-spark Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com | +1650203 From: Kyle Weaver Date: Tue, May 14, 2019 at 9:38 AM To: Hi Augusto, Right now the default behavior is to cache all intermediate RDDs that are consumed more than once by the pipeline. This can be disabled with `options.setCacheDisabled(true)` [1], but there is currently no way for the user to specify to the runner that it should cache certain RDDs, but not others. There has recently been some discussion on the Slack (#spark-beam) about implementing such a feature, but no concrete plans as of yet. [1] https://github.com/apache/beam/blob/81faf35c8a42493317eba9fa1e7b06fb42d54662/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java#L150 Thanks Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com | +1650203 From: augusto@gmail.com Date: Tue, May 14, 2019 at 5:01 AM To: Hi, I guess the title says it all, right now it seems like BEAM caches all the intermediate RDD results for my pipeline when using the Spark runner, this leads to a very inefficient usage of memory. Any way to control this? Best regards, Augusto
Re: Is there a way to decide what RDDs get cached in the Spark Runner?
Just to clarify, do you need direct control over what to cache, or would it be OK to let Spark decide the minimal set of RDDs to cache as long as we didn't cache all intermediates? From: Augusto Ribeiro Date: Wed, May 15, 2019 at 8:37 AM To: > Hi Kyle, > > Thanks for the help. It seems like I have no other choice than using Spark > directly, since my job causes immense memory pressure if I can't decide what > to cache. > > Best regards, > Augusto > > On 14 May 2019, at 18:40, Kyle Weaver wrote: > > Minor correction: Slack channel is actually #beam-spark > > Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com | > +1650203 > > > From: Kyle Weaver > Date: Tue, May 14, 2019 at 9:38 AM > To: > >> Hi Augusto, >> >> Right now the default behavior is to cache all intermediate RDDs that are >> consumed more than once by the pipeline. This can be disabled with >> `options.setCacheDisabled(true)` [1], but there is currently no way for the >> user to specify to the runner that it should cache certain RDDs, but not >> others. >> >> There has recently been some discussion on the Slack (#spark-beam) about >> implementing such a feature, but no concrete plans as of yet. >> >> [1] >> https://github.com/apache/beam/blob/81faf35c8a42493317eba9fa1e7b06fb42d54662/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java#L150 >> >> Thanks >> >> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com | >> +1650203 >> >> >> From: augusto@gmail.com >> Date: Tue, May 14, 2019 at 5:01 AM >> To: >> >>> Hi, >>> >>> I guess the title says it all, right now it seems like BEAM caches all the >>> intermediate RDD results for my pipeline when using the Spark runner, this >>> leads to a very inefficient usage of memory. Any way to control this? >>> >>> Best regards, >>> Augusto > >