Re: flatMap output on disk / flatMap memory overhead

2015-08-01 Thread Puneet Kapoor
Hi Ocatavian,

Just out of curiosity, did you try persisting your RDD in serialized format
MEMORY_AND_DISK_SER  or MEMORY_ONLY_SER ??
i.e. changing your :
rdd.persist(MEMORY_AND_DISK)
to
rdd.persist(MEMORY_ONLY_SER)

Regards

On Wed, Jun 10, 2015 at 7:27 AM, Imran Rashid iras...@cloudera.com wrote:

 I agree with Richard.  It looks like the issue here is shuffling, and
 shuffle data is always written to disk, so the issue is definitely not that
 all the output of flatMap has to be stored in memory.

 If at all possible, I'd first suggest upgrading to a new version of spark
 -- even in 1.2, there were big improvements to shuffle with sort based
 shuffle as the default.

 On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher rmarsc...@localytics.com
  wrote:

 Are you sure it's memory related? What is the disk utilization and IO
 performance on the workers? The error you posted looks to be related to
 shuffle trying to obtain block data from another worker node and failing to
 do so in reasonable amount of time. It may still be memory related, but I'm
 not sure that other resources are ruled out yet.

 On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea 
 octavian.ga...@inf.ethz.ch wrote:

 I was tried using reduceByKey, without success.

 I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey
 .
 However, I got the same error as before, namely the error described here:

 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html

 My task is to count the frequencies of pairs of words that occur in a
 set of
 documents at least 5 times. I know that this final output is sparse and
 should comfortably fit in memory. However, the intermediate pairs that
 are
 spilled by flatMap might need to be stored on the disk, but I don't
 understand why the persist option does not work and my job fails.

 My code:

 rdd.persist(StorageLevel.MEMORY_AND_DISK)
  .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type
 ((word1,word2) , 1)
 .reduceByKey((a,b) = (a + b).toShort)
 .filter({case((x,y),count) = count = 5})


 My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node.
 One
 node I keep for the master, 7 nodes for the workers.

 my conf:

 conf.set(spark.cores.max, 128)
 conf.set(spark.akka.frameSize, 1024)
 conf.set(spark.executor.memory, 115g)
 conf.set(spark.shuffle.file.buffer.kb, 1000)

 my spark-env.sh:
  ulimit -n 20
  SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
 -XX:-UseCompressedOops
  SPARK_DRIVER_MEMORY=129G

 spark version: 1.1.1

 Thank you a lot for your help!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: flatMap output on disk / flatMap memory overhead

2015-06-09 Thread Imran Rashid
I agree with Richard.  It looks like the issue here is shuffling, and
shuffle data is always written to disk, so the issue is definitely not that
all the output of flatMap has to be stored in memory.

If at all possible, I'd first suggest upgrading to a new version of spark
-- even in 1.2, there were big improvements to shuffle with sort based
shuffle as the default.

On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher rmarsc...@localytics.com
wrote:

 Are you sure it's memory related? What is the disk utilization and IO
 performance on the workers? The error you posted looks to be related to
 shuffle trying to obtain block data from another worker node and failing to
 do so in reasonable amount of time. It may still be memory related, but I'm
 not sure that other resources are ruled out yet.

 On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea octavian.ga...@inf.ethz.ch
  wrote:

 I was tried using reduceByKey, without success.

 I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey .
 However, I got the same error as before, namely the error described here:

 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html

 My task is to count the frequencies of pairs of words that occur in a set
 of
 documents at least 5 times. I know that this final output is sparse and
 should comfortably fit in memory. However, the intermediate pairs that are
 spilled by flatMap might need to be stored on the disk, but I don't
 understand why the persist option does not work and my job fails.

 My code:

 rdd.persist(StorageLevel.MEMORY_AND_DISK)
  .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type
 ((word1,word2) , 1)
 .reduceByKey((a,b) = (a + b).toShort)
 .filter({case((x,y),count) = count = 5})


 My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One
 node I keep for the master, 7 nodes for the workers.

 my conf:

 conf.set(spark.cores.max, 128)
 conf.set(spark.akka.frameSize, 1024)
 conf.set(spark.executor.memory, 115g)
 conf.set(spark.shuffle.file.buffer.kb, 1000)

 my spark-env.sh:
  ulimit -n 20
  SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
 -XX:-UseCompressedOops
  SPARK_DRIVER_MEMORY=129G

 spark version: 1.1.1

 Thank you a lot for your help!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: flatMap output on disk / flatMap memory overhead

2015-06-02 Thread Akhil Das
You could try rdd.persist(MEMORY_AND_DISK/DISK_ONLY).flatMap(...), I think
StorageLevel MEMORY_AND_DISK means spark will try to keep the data in
memory and if there isn't sufficient space then it will be shipped to the
disk.

Thanks
Best Regards

On Mon, Jun 1, 2015 at 11:02 PM, octavian.ganea octavian.ga...@inf.ethz.ch
wrote:

 Hi,

 Is there any way to force the output RDD of a  flatMap op to be stored in
 both memory and disk as it is computed ? My RAM would not be able to fit
 the
 entire output of flatMap, so it really needs to starts using disk after the
 RAM gets full. I didn't find any way to force this.

 Also, what is the memory overhead of flatMap ? From my computations, the
 output RDD should fit in memory, but I get the following error after a
 while
 (and I know it's because of memory issues, since running the program with
 1/3 of the input data finishes succesfully)

 15/06/01 19:02:49 ERROR BlockFetcherIterator$BasicBlockFetcherIterator:
 Could not get block(s) from
 ConnectionManagerId(dco-node036-mgt.dco.ethz.ch,57478)
 java.io.IOException: sendMessageReliably failed because ack was not
 received
 within 60 sec
 at

 org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:866)
 at

 org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:865)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:865)
 at

 io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
 at

 io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
 at
 io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
 at java.lang.Thread.run(Thread.java:745)


 Also, I've seen also this:
 https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
 but my understanding is that one should apply something like:
 rdd.flatMap(...).persist(MEMORY_AND_DISK) which assumes that the entire
 output of flatMap is first stored in memory (which is not possible in my
 case) and, only when it's done, is stored on the disk. Please correct me if
 I'm wrong.  Anways, I've tried using this , but I got the same error.

 My config:

 conf.set(spark.cores.max, 128)
 conf.set(spark.akka.frameSize, 1024)
 conf.set(spark.executor.memory, 125g)
 conf.set(spark.shuffle.file.buffer.kb, 1000)
 conf.set(spark.shuffle.consolidateFiles, true)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: flatMap output on disk / flatMap memory overhead

2015-06-02 Thread octavian.ganea
I was tried using reduceByKey, without success. 

I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey .
However, I got the same error as before, namely the error described here: 
http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html

My task is to count the frequencies of pairs of words that occur in a set of
documents at least 5 times. I know that this final output is sparse and
should comfortably fit in memory. However, the intermediate pairs that are
spilled by flatMap might need to be stored on the disk, but I don't
understand why the persist option does not work and my job fails.

My code:

rdd.persist(StorageLevel.MEMORY_AND_DISK)
 .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type
((word1,word2) , 1)
.reduceByKey((a,b) = (a + b).toShort)
.filter({case((x,y),count) = count = 5})
 

My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One
node I keep for the master, 7 nodes for the workers.

my conf:

conf.set(spark.cores.max, 128)
conf.set(spark.akka.frameSize, 1024)
conf.set(spark.executor.memory, 115g)
conf.set(spark.shuffle.file.buffer.kb, 1000)

my spark-env.sh:
 ulimit -n 20
 SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
-XX:-UseCompressedOops
 SPARK_DRIVER_MEMORY=129G

spark version: 1.1.1

Thank you a lot for your help!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: flatMap output on disk / flatMap memory overhead

2015-06-02 Thread Richard Marscher
Are you sure it's memory related? What is the disk utilization and IO
performance on the workers? The error you posted looks to be related to
shuffle trying to obtain block data from another worker node and failing to
do so in reasonable amount of time. It may still be memory related, but I'm
not sure that other resources are ruled out yet.

On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea octavian.ga...@inf.ethz.ch
wrote:

 I was tried using reduceByKey, without success.

 I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey .
 However, I got the same error as before, namely the error described here:

 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html

 My task is to count the frequencies of pairs of words that occur in a set
 of
 documents at least 5 times. I know that this final output is sparse and
 should comfortably fit in memory. However, the intermediate pairs that are
 spilled by flatMap might need to be stored on the disk, but I don't
 understand why the persist option does not work and my job fails.

 My code:

 rdd.persist(StorageLevel.MEMORY_AND_DISK)
  .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type
 ((word1,word2) , 1)
 .reduceByKey((a,b) = (a + b).toShort)
 .filter({case((x,y),count) = count = 5})


 My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One
 node I keep for the master, 7 nodes for the workers.

 my conf:

 conf.set(spark.cores.max, 128)
 conf.set(spark.akka.frameSize, 1024)
 conf.set(spark.executor.memory, 115g)
 conf.set(spark.shuffle.file.buffer.kb, 1000)

 my spark-env.sh:
  ulimit -n 20
  SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
 -XX:-UseCompressedOops
  SPARK_DRIVER_MEMORY=129G

 spark version: 1.1.1

 Thank you a lot for your help!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org