Re: flatMap output on disk / flatMap memory overhead
Hi Ocatavian, Just out of curiosity, did you try persisting your RDD in serialized format "MEMORY_AND_DISK_SER" or "MEMORY_ONLY_SER" ?? i.e. changing your : "rdd.persist(MEMORY_AND_DISK)" to "rdd.persist(MEMORY_ONLY_SER)" Regards On Wed, Jun 10, 2015 at 7:27 AM, Imran Rashid wrote: > I agree with Richard. It looks like the issue here is shuffling, and > shuffle data is always written to disk, so the issue is definitely not that > all the output of flatMap has to be stored in memory. > > If at all possible, I'd first suggest upgrading to a new version of spark > -- even in 1.2, there were big improvements to shuffle with sort based > shuffle as the default. > > On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher > wrote: > >> Are you sure it's memory related? What is the disk utilization and IO >> performance on the workers? The error you posted looks to be related to >> shuffle trying to obtain block data from another worker node and failing to >> do so in reasonable amount of time. It may still be memory related, but I'm >> not sure that other resources are ruled out yet. >> >> On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea < >> octavian.ga...@inf.ethz.ch> wrote: >> >>> I was tried using reduceByKey, without success. >>> >>> I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey >>> . >>> However, I got the same error as before, namely the error described here: >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html >>> >>> My task is to count the frequencies of pairs of words that occur in a >>> set of >>> documents at least 5 times. I know that this final output is sparse and >>> should comfortably fit in memory. However, the intermediate pairs that >>> are >>> spilled by flatMap might need to be stored on the disk, but I don't >>> understand why the persist option does not work and my job fails. >>> >>> My code: >>> >>> rdd.persist(StorageLevel.MEMORY_AND_DISK) >>> .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type >>> ((word1,word2) , 1) >>> .reduceByKey((a,b) => (a + b).toShort) >>> .filter({case((x,y),count) => count >= 5}) >>> >>> >>> My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. >>> One >>> node I keep for the master, 7 nodes for the workers. >>> >>> my conf: >>> >>> conf.set("spark.cores.max", "128") >>> conf.set("spark.akka.frameSize", "1024") >>> conf.set("spark.executor.memory", "115g") >>> conf.set("spark.shuffle.file.buffer.kb", "1000") >>> >>> my spark-env.sh: >>> ulimit -n 20 >>> SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit >>> -XX:-UseCompressedOops" >>> SPARK_DRIVER_MEMORY=129G >>> >>> spark version: 1.1.1 >>> >>> Thank you a lot for your help! >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >
Re: flatMap output on disk / flatMap memory overhead
I agree with Richard. It looks like the issue here is shuffling, and shuffle data is always written to disk, so the issue is definitely not that all the output of flatMap has to be stored in memory. If at all possible, I'd first suggest upgrading to a new version of spark -- even in 1.2, there were big improvements to shuffle with sort based shuffle as the default. On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher wrote: > Are you sure it's memory related? What is the disk utilization and IO > performance on the workers? The error you posted looks to be related to > shuffle trying to obtain block data from another worker node and failing to > do so in reasonable amount of time. It may still be memory related, but I'm > not sure that other resources are ruled out yet. > > On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea > wrote: > >> I was tried using reduceByKey, without success. >> >> I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . >> However, I got the same error as before, namely the error described here: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html >> >> My task is to count the frequencies of pairs of words that occur in a set >> of >> documents at least 5 times. I know that this final output is sparse and >> should comfortably fit in memory. However, the intermediate pairs that are >> spilled by flatMap might need to be stored on the disk, but I don't >> understand why the persist option does not work and my job fails. >> >> My code: >> >> rdd.persist(StorageLevel.MEMORY_AND_DISK) >> .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type >> ((word1,word2) , 1) >> .reduceByKey((a,b) => (a + b).toShort) >> .filter({case((x,y),count) => count >= 5}) >> >> >> My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One >> node I keep for the master, 7 nodes for the workers. >> >> my conf: >> >> conf.set("spark.cores.max", "128") >> conf.set("spark.akka.frameSize", "1024") >> conf.set("spark.executor.memory", "115g") >> conf.set("spark.shuffle.file.buffer.kb", "1000") >> >> my spark-env.sh: >> ulimit -n 20 >> SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit >> -XX:-UseCompressedOops" >> SPARK_DRIVER_MEMORY=129G >> >> spark version: 1.1.1 >> >> Thank you a lot for your help! >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: flatMap output on disk / flatMap memory overhead
Are you sure it's memory related? What is the disk utilization and IO performance on the workers? The error you posted looks to be related to shuffle trying to obtain block data from another worker node and failing to do so in reasonable amount of time. It may still be memory related, but I'm not sure that other resources are ruled out yet. On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea wrote: > I was tried using reduceByKey, without success. > > I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . > However, I got the same error as before, namely the error described here: > > http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html > > My task is to count the frequencies of pairs of words that occur in a set > of > documents at least 5 times. I know that this final output is sparse and > should comfortably fit in memory. However, the intermediate pairs that are > spilled by flatMap might need to be stored on the disk, but I don't > understand why the persist option does not work and my job fails. > > My code: > > rdd.persist(StorageLevel.MEMORY_AND_DISK) > .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type > ((word1,word2) , 1) > .reduceByKey((a,b) => (a + b).toShort) > .filter({case((x,y),count) => count >= 5}) > > > My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One > node I keep for the master, 7 nodes for the workers. > > my conf: > > conf.set("spark.cores.max", "128") > conf.set("spark.akka.frameSize", "1024") > conf.set("spark.executor.memory", "115g") > conf.set("spark.shuffle.file.buffer.kb", "1000") > > my spark-env.sh: > ulimit -n 20 > SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit > -XX:-UseCompressedOops" > SPARK_DRIVER_MEMORY=129G > > spark version: 1.1.1 > > Thank you a lot for your help! > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: flatMap output on disk / flatMap memory overhead
I was tried using reduceByKey, without success. I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . However, I got the same error as before, namely the error described here: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html My task is to count the frequencies of pairs of words that occur in a set of documents at least 5 times. I know that this final output is sparse and should comfortably fit in memory. However, the intermediate pairs that are spilled by flatMap might need to be stored on the disk, but I don't understand why the persist option does not work and my job fails. My code: rdd.persist(StorageLevel.MEMORY_AND_DISK) .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type ((word1,word2) , 1) .reduceByKey((a,b) => (a + b).toShort) .filter({case((x,y),count) => count >= 5}) My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One node I keep for the master, 7 nodes for the workers. my conf: conf.set("spark.cores.max", "128") conf.set("spark.akka.frameSize", "1024") conf.set("spark.executor.memory", "115g") conf.set("spark.shuffle.file.buffer.kb", "1000") my spark-env.sh: ulimit -n 20 SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit -XX:-UseCompressedOops" SPARK_DRIVER_MEMORY=129G spark version: 1.1.1 Thank you a lot for your help! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: flatMap output on disk / flatMap memory overhead
You could try rdd.persist(MEMORY_AND_DISK/DISK_ONLY).flatMap(...), I think StorageLevel MEMORY_AND_DISK means spark will try to keep the data in memory and if there isn't sufficient space then it will be shipped to the disk. Thanks Best Regards On Mon, Jun 1, 2015 at 11:02 PM, octavian.ganea wrote: > Hi, > > Is there any way to force the output RDD of a flatMap op to be stored in > both memory and disk as it is computed ? My RAM would not be able to fit > the > entire output of flatMap, so it really needs to starts using disk after the > RAM gets full. I didn't find any way to force this. > > Also, what is the memory overhead of flatMap ? From my computations, the > output RDD should fit in memory, but I get the following error after a > while > (and I know it's because of memory issues, since running the program with > 1/3 of the input data finishes succesfully) > > 15/06/01 19:02:49 ERROR BlockFetcherIterator$BasicBlockFetcherIterator: > Could not get block(s) from > ConnectionManagerId(dco-node036-mgt.dco.ethz.ch,57478) > java.io.IOException: sendMessageReliably failed because ack was not > received > within 60 sec > at > > org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:866) > at > > org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:865) > at scala.Option.foreach(Option.scala:236) > at > > org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:865) > at > > io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581) > at > > io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656) > at > io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367) > at java.lang.Thread.run(Thread.java:745) > > > Also, I've seen also this: > https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence > but my understanding is that one should apply something like: > rdd.flatMap(...).persist(MEMORY_AND_DISK) which assumes that the entire > output of flatMap is first stored in memory (which is not possible in my > case) and, only when it's done, is stored on the disk. Please correct me if > I'm wrong. Anways, I've tried using this , but I got the same error. > > My config: > > conf.set("spark.cores.max", "128") > conf.set("spark.akka.frameSize", "1024") > conf.set("spark.executor.memory", "125g") > conf.set("spark.shuffle.file.buffer.kb", "1000") > conf.set("spark.shuffle.consolidateFiles", "true") > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
flatMap output on disk / flatMap memory overhead
Hi, Is there any way to force the output RDD of a flatMap op to be stored in both memory and disk as it is computed ? My RAM would not be able to fit the entire output of flatMap, so it really needs to starts using disk after the RAM gets full. I didn't find any way to force this. Also, what is the memory overhead of flatMap ? From my computations, the output RDD should fit in memory, but I get the following error after a while (and I know it's because of memory issues, since running the program with 1/3 of the input data finishes succesfully) 15/06/01 19:02:49 ERROR BlockFetcherIterator$BasicBlockFetcherIterator: Could not get block(s) from ConnectionManagerId(dco-node036-mgt.dco.ethz.ch,57478) java.io.IOException: sendMessageReliably failed because ack was not received within 60 sec at org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:866) at org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:865) at scala.Option.foreach(Option.scala:236) at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:865) at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581) at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656) at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367) at java.lang.Thread.run(Thread.java:745) Also, I've seen also this: https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence but my understanding is that one should apply something like: rdd.flatMap(...).persist(MEMORY_AND_DISK) which assumes that the entire output of flatMap is first stored in memory (which is not possible in my case) and, only when it's done, is stored on the disk. Please correct me if I'm wrong. Anways, I've tried using this , but I got the same error. My config: conf.set("spark.cores.max", "128") conf.set("spark.akka.frameSize", "1024") conf.set("spark.executor.memory", "125g") conf.set("spark.shuffle.file.buffer.kb", "1000") conf.set("spark.shuffle.consolidateFiles", "true") -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org