You could try rdd.persist(MEMORY_AND_DISK/DISK_ONLY).flatMap(...), I think
StorageLevel MEMORY_AND_DISK means spark will try to keep the data in
memory and if there isn't sufficient space then it will be shipped to the
disk.

Thanks
Best Regards

On Mon, Jun 1, 2015 at 11:02 PM, octavian.ganea <octavian.ga...@inf.ethz.ch>
wrote:

> Hi,
>
> Is there any way to force the output RDD of a  flatMap op to be stored in
> both memory and disk as it is computed ? My RAM would not be able to fit
> the
> entire output of flatMap, so it really needs to starts using disk after the
> RAM gets full. I didn't find any way to force this.
>
> Also, what is the memory overhead of flatMap ? From my computations, the
> output RDD should fit in memory, but I get the following error after a
> while
> (and I know it's because of memory issues, since running the program with
> 1/3 of the input data finishes succesfully)
>
> 15/06/01 19:02:49 ERROR BlockFetcherIterator$BasicBlockFetcherIterator:
> Could not get block(s) from
> ConnectionManagerId(dco-node036-mgt.dco.ethz.ch,57478)
> java.io.IOException: sendMessageReliably failed because ack was not
> received
> within 60 sec
>         at
>
> org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:866)
>         at
>
> org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:865)
>         at scala.Option.foreach(Option.scala:236)
>         at
>
> org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:865)
>         at
>
> io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
>         at
>
> io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
>         at
> io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> Also, I've seen also this:
> https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
> but my understanding is that one should apply something like:
> rdd.flatMap(...).persist(MEMORY_AND_DISK) which assumes that the entire
> output of flatMap is first stored in memory (which is not possible in my
> case) and, only when it's done, is stored on the disk. Please correct me if
> I'm wrong.  Anways, I've tried using this , but I got the same error.
>
> My config:
>
>     conf.set("spark.cores.max", "128")
>     conf.set("spark.akka.frameSize", "1024")
>     conf.set("spark.executor.memory", "125g")
>     conf.set("spark.shuffle.file.buffer.kb", "1000")
>     conf.set("spark.shuffle.consolidateFiles", "true")
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to